001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertThrows;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.util.concurrent.atomic.AtomicInteger;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.log.HBaseMarkers;
030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
034import org.apache.hadoop.hbase.testclassification.LargeTests;
035import org.apache.hadoop.hbase.testclassification.MasterTests;
036import org.apache.hadoop.hbase.util.CommonFSUtils;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.hadoop.hdfs.MiniDFSCluster;
039import org.apache.hadoop.hdfs.server.datanode.DataNode;
040import org.junit.jupiter.api.BeforeEach;
041import org.junit.jupiter.api.Tag;
042import org.junit.jupiter.api.Test;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@Tag(MasterTests.TAG)
047@Tag(LargeTests.TAG)
048public class TestWALProcedureStoreOnHDFS {
049
050  private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStoreOnHDFS.class);
051
052  protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
053
054  private WALProcedureStore store;
055
056  private ProcedureStore.ProcedureStoreListener stopProcedureListener =
057    new ProcedureStore.ProcedureStoreListener() {
058      @Override
059      public void postSync() {
060      }
061
062      @Override
063      public void abortProcess() {
064        LOG.error(HBaseMarkers.FATAL, "Abort the Procedure Store");
065        store.stop(true);
066      }
067    };
068
069  @BeforeEach
070  public void initConfig() {
071    Configuration conf = UTIL.getConfiguration();
072
073    conf.setInt("dfs.replication", 3);
074    conf.setInt("dfs.namenode.replication.min", 3);
075
076    // increase the value for slow test-env
077    conf.setInt(WALProcedureStore.WAIT_BEFORE_ROLL_CONF_KEY, 1000);
078    conf.setInt(WALProcedureStore.ROLL_RETRIES_CONF_KEY, 10);
079    conf.setInt(WALProcedureStore.MAX_SYNC_FAILURE_ROLL_CONF_KEY, 10);
080  }
081
082  // No @BeforeEach because some tests need to do additional config first
083  private void setupDFS() throws Exception {
084    Configuration conf = UTIL.getConfiguration();
085    MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
086    CommonFSUtils.setWALRootDir(conf, new Path(conf.get("fs.defaultFS"), "/tmp/wal"));
087
088    Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
089    store = ProcedureTestingUtility.createWalStore(conf, logDir);
090    store.registerListener(stopProcedureListener);
091    store.start(8);
092    store.recoverLease();
093  }
094
095  // No @AfterEach
096  @SuppressWarnings("JUnit4TearDownNotRun")
097  public void tearDown() throws Exception {
098    store.stop(false);
099    UTIL.getDFSCluster().getFileSystem().delete(store.getWALDir(), true);
100
101    try {
102      UTIL.shutdownMiniCluster();
103    } catch (Exception e) {
104      LOG.warn("failure shutting down cluster", e);
105    }
106  }
107
108  @Test
109  public void testWalAbortOnLowReplication() throws Exception {
110    setupDFS();
111
112    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
113
114    LOG.info("Stop DataNode");
115    UTIL.getDFSCluster().stopDataNode(0);
116    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
117
118    assertThrows(RuntimeException.class, () -> {
119      store.insert(new TestProcedure(1, -1), null);
120      for (long i = 2; store.isRunning(); ++i) {
121        assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
122        store.insert(new TestProcedure(i, -1), null);
123        Thread.sleep(100);
124      }
125      assertFalse(store.isRunning());
126    });
127  }
128
129  @Test
130  public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
131    setupDFS();
132
133    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
134    store.registerListener(new ProcedureStore.ProcedureStoreListener() {
135      @Override
136      public void postSync() {
137        Threads.sleepWithoutInterrupt(2000);
138      }
139
140      @Override
141      public void abortProcess() {
142      }
143    });
144
145    final AtomicInteger reCount = new AtomicInteger(0);
146    Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
147    for (int i = 0; i < thread.length; ++i) {
148      final long procId = i + 1L;
149      thread[i] = new Thread(() -> {
150        try {
151          LOG.debug("[S] INSERT " + procId);
152          store.insert(new TestProcedure(procId, -1), null);
153          LOG.debug("[E] INSERT " + procId);
154        } catch (RuntimeException e) {
155          reCount.incrementAndGet();
156          LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
157        }
158      });
159      thread[i].start();
160    }
161
162    Thread.sleep(1000);
163    LOG.info("Stop DataNode");
164    UTIL.getDFSCluster().stopDataNode(0);
165    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
166
167    for (int i = 0; i < thread.length; ++i) {
168      thread[i].join();
169    }
170
171    assertFalse(store.isRunning());
172    assertTrue(reCount.get() >= store.getNumThreads() && reCount.get() < thread.length,
173      reCount.toString());
174  }
175
176  @Test
177  public void testWalRollOnLowReplication() throws Exception {
178    UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1);
179    setupDFS();
180
181    int dnCount = 0;
182    store.insert(new TestProcedure(1, -1), null);
183    UTIL.getDFSCluster().restartDataNode(dnCount);
184    for (long i = 2; i < 100; ++i) {
185      store.insert(new TestProcedure(i, -1), null);
186      waitForNumReplicas(3);
187      Thread.sleep(100);
188      if ((i % 30) == 0) {
189        LOG.info("Restart Data Node");
190        UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
191      }
192    }
193    assertTrue(store.isRunning());
194  }
195
196  public void waitForNumReplicas(int numReplicas) throws Exception {
197    while (UTIL.getDFSCluster().getDataNodes().size() < numReplicas) {
198      Thread.sleep(100);
199    }
200
201    for (int i = 0; i < numReplicas; ++i) {
202      for (DataNode dn : UTIL.getDFSCluster().getDataNodes()) {
203        while (!dn.isDatanodeFullyStarted()) {
204          Thread.sleep(100);
205        }
206      }
207    }
208  }
209}