001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.util.concurrent.atomic.AtomicInteger;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.log.HBaseMarkers;
030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
034import org.apache.hadoop.hbase.testclassification.LargeTests;
035import org.apache.hadoop.hbase.testclassification.MasterTests;
036import org.apache.hadoop.hbase.util.CommonFSUtils;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.hadoop.hdfs.MiniDFSCluster;
039import org.apache.hadoop.hdfs.server.datanode.DataNode;
040import org.junit.Before;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047@Category({MasterTests.class, LargeTests.class})
048public class TestWALProcedureStoreOnHDFS {
049
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052      HBaseClassTestRule.forClass(TestWALProcedureStoreOnHDFS.class);
053
054  private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStoreOnHDFS.class);
055
056  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
057
058  private WALProcedureStore store;
059
060  private ProcedureStore.ProcedureStoreListener stopProcedureListener = new ProcedureStore.ProcedureStoreListener() {
061    @Override
062    public void postSync() {}
063
064    @Override
065    public void abortProcess() {
066      LOG.error(HBaseMarkers.FATAL, "Abort the Procedure Store");
067      store.stop(true);
068    }
069  };
070
071  @Before
072  public void initConfig() {
073    Configuration conf = UTIL.getConfiguration();
074
075    conf.setInt("dfs.replication", 3);
076    conf.setInt("dfs.namenode.replication.min", 3);
077
078    // increase the value for slow test-env
079    conf.setInt(WALProcedureStore.WAIT_BEFORE_ROLL_CONF_KEY, 1000);
080    conf.setInt(WALProcedureStore.ROLL_RETRIES_CONF_KEY, 10);
081    conf.setInt(WALProcedureStore.MAX_SYNC_FAILURE_ROLL_CONF_KEY, 10);
082  }
083
084  // No @Before because some tests need to do additional config first
085  private void setupDFS() throws Exception {
086    Configuration conf = UTIL.getConfiguration();
087    MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
088    CommonFSUtils.setWALRootDir(conf, new Path(conf.get("fs.defaultFS"), "/tmp/wal"));
089
090    Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
091    store = ProcedureTestingUtility.createWalStore(conf, logDir);
092    store.registerListener(stopProcedureListener);
093    store.start(8);
094    store.recoverLease();
095  }
096
097  // No @After
098  @SuppressWarnings("JUnit4TearDownNotRun")
099  public void tearDown() throws Exception {
100    store.stop(false);
101    UTIL.getDFSCluster().getFileSystem().delete(store.getWALDir(), true);
102
103    try {
104      UTIL.shutdownMiniCluster();
105    } catch (Exception e) {
106      LOG.warn("failure shutting down cluster", e);
107    }
108  }
109
110  @Test(expected=RuntimeException.class)
111  public void testWalAbortOnLowReplication() throws Exception {
112    setupDFS();
113
114    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
115
116    LOG.info("Stop DataNode");
117    UTIL.getDFSCluster().stopDataNode(0);
118    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
119
120    store.insert(new TestProcedure(1, -1), null);
121    for (long i = 2; store.isRunning(); ++i) {
122      assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
123      store.insert(new TestProcedure(i, -1), null);
124      Thread.sleep(100);
125    }
126    assertFalse(store.isRunning());
127  }
128
129  @Test
130  public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
131    setupDFS();
132
133    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
134    store.registerListener(new ProcedureStore.ProcedureStoreListener() {
135      @Override
136      public void postSync() { Threads.sleepWithoutInterrupt(2000); }
137
138      @Override
139      public void abortProcess() {}
140    });
141
142    final AtomicInteger reCount = new AtomicInteger(0);
143    Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
144    for (int i = 0; i < thread.length; ++i) {
145      final long procId = i + 1L;
146      thread[i] = new Thread(() -> {
147        try {
148          LOG.debug("[S] INSERT " + procId);
149          store.insert(new TestProcedure(procId, -1), null);
150          LOG.debug("[E] INSERT " + procId);
151        } catch (RuntimeException e) {
152          reCount.incrementAndGet();
153          LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
154        }
155      });
156      thread[i].start();
157    }
158
159    Thread.sleep(1000);
160    LOG.info("Stop DataNode");
161    UTIL.getDFSCluster().stopDataNode(0);
162    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
163
164    for (int i = 0; i < thread.length; ++i) {
165      thread[i].join();
166    }
167
168    assertFalse(store.isRunning());
169    assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
170                                   reCount.get() < thread.length);
171  }
172
173  @Test
174  public void testWalRollOnLowReplication() throws Exception {
175    UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1);
176    setupDFS();
177
178    int dnCount = 0;
179    store.insert(new TestProcedure(1, -1), null);
180    UTIL.getDFSCluster().restartDataNode(dnCount);
181    for (long i = 2; i < 100; ++i) {
182      store.insert(new TestProcedure(i, -1), null);
183      waitForNumReplicas(3);
184      Thread.sleep(100);
185      if ((i % 30) == 0) {
186        LOG.info("Restart Data Node");
187        UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
188      }
189    }
190    assertTrue(store.isRunning());
191  }
192
193  public void waitForNumReplicas(int numReplicas) throws Exception {
194    while (UTIL.getDFSCluster().getDataNodes().size() < numReplicas) {
195      Thread.sleep(100);
196    }
197
198    for (int i = 0; i < numReplicas; ++i) {
199      for (DataNode dn: UTIL.getDFSCluster().getDataNodes()) {
200        while (!dn.isDatanodeFullyStarted()) {
201          Thread.sleep(100);
202        }
203      }
204    }
205  }
206}