001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertNotNull;
024import static org.junit.jupiter.api.Assertions.assertThrows;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.CountDownLatch;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
039import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
040import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
041import org.apache.hadoop.hbase.procedure2.Procedure;
042import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
043import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
044import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
045import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
046import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
047import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
048import org.apache.hadoop.hbase.testclassification.LargeTests;
049import org.apache.hadoop.hbase.testclassification.MasterTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.CommonFSUtils;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.hadoop.hbase.util.JVMClusterUtil;
054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
055import org.junit.jupiter.api.AfterEach;
056import org.junit.jupiter.api.BeforeEach;
057import org.junit.jupiter.api.Tag;
058import org.junit.jupiter.api.Test;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
063
064import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
066
067@Tag(MasterTests.TAG)
068@Tag(LargeTests.TAG)
069public class TestSplitWALManager {
070
071  private static final Logger LOG = LoggerFactory.getLogger(TestSplitWALManager.class);
072  private static HBaseTestingUtil TEST_UTIL;
073  private HMaster master;
074  private SplitWALManager splitWALManager;
075  private TableName TABLE_NAME;
076  private byte[] FAMILY;
077
078  @BeforeEach
079  public void setUp() throws Exception {
080    TEST_UTIL = new HBaseTestingUtil();
081    TEST_UTIL.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false);
082    TEST_UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 5);
083    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
084    TEST_UTIL.startMiniCluster(3);
085    master = TEST_UTIL.getHBaseCluster().getMaster();
086    splitWALManager = master.getSplitWALManager();
087    TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager"));
088    FAMILY = Bytes.toBytes("test");
089  }
090
091  @AfterEach
092  public void tearDown() throws Exception {
093    TEST_UTIL.shutdownMiniCluster();
094  }
095
096  @Test
097  public void testAcquireAndRelease() throws Exception {
098    List<FakeServerProcedure> testProcedures = new ArrayList<>();
099    for (int i = 0; i < 4; i++) {
100      testProcedures.add(new FakeServerProcedure(
101        ServerName.valueOf("server" + i, 12345, EnvironmentEdgeManager.currentTime())));
102    }
103    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
104    procExec.submitProcedure(testProcedures.get(0));
105    TEST_UTIL.waitFor(10000, () -> testProcedures.get(0).isWorkerAcquired());
106    procExec.submitProcedure(testProcedures.get(1));
107    procExec.submitProcedure(testProcedures.get(2));
108    TEST_UTIL.waitFor(10000,
109      () -> testProcedures.get(1).isWorkerAcquired() && testProcedures.get(2).isWorkerAcquired());
110
111    // should get a ProcedureSuspendedException, so it will try to acquire but can not get a worker
112    procExec.submitProcedure(testProcedures.get(3));
113    TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isTriedToAcquire());
114    for (int i = 0; i < 3; i++) {
115      Thread.sleep(1000);
116      assertFalse(testProcedures.get(3).isWorkerAcquired());
117    }
118
119    // release a worker, the last procedure should be able to get a worker
120    testProcedures.get(0).countDown();
121    TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isWorkerAcquired());
122
123    for (int i = 1; i < 4; i++) {
124      testProcedures.get(i).countDown();
125    }
126    for (int i = 0; i < 4; i++) {
127      final int index = i;
128      TEST_UTIL.waitFor(10000, () -> testProcedures.get(index).isFinished());
129    }
130  }
131
132  @Test
133  public void testAddNewServer() throws Exception {
134    List<FakeServerProcedure> testProcedures = new ArrayList<>();
135    for (int i = 0; i < 4; i++) {
136      testProcedures.add(
137        new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(1).getServerName()));
138    }
139    ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
140    assertNotNull(server);
141    assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
142    assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
143
144    assertThrows(ProcedureSuspendedException.class,
145      () -> splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
146
147    JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer();
148    newServer.waitForServerOnline();
149    assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
150  }
151
152  @Test
153  public void testCreateSplitWALProcedures() throws Exception {
154    TEST_UTIL.createTable(TABLE_NAME, FAMILY, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
155    // load table
156    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
157    ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
158    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
159    Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
160      AbstractFSWALProvider.getWALDirectoryName(metaServer.toString()));
161    // Test splitting meta wal
162    FileStatus[] wals =
163      TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER);
164    assertEquals(1, wals.length);
165    List<Procedure> testProcedures =
166      splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
167    assertEquals(1, testProcedures.size());
168    ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
169    assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
170
171    // Test splitting wal
172    wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER);
173    assertEquals(1, wals.length);
174    testProcedures =
175      splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
176    assertEquals(1, testProcedures.size());
177    ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
178    assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
179  }
180
181  @Test
182  public void testAcquireAndReleaseSplitWALWorker() throws Exception {
183    ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
184    List<FakeServerProcedure> testProcedures = new ArrayList<>();
185    for (int i = 0; i < 3; i++) {
186      FakeServerProcedure procedure =
187        new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
188      testProcedures.add(procedure);
189      ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE,
190        HConstants.NO_NONCE);
191    }
192    TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
193    FakeServerProcedure failedProcedure =
194      new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
195    ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE,
196      HConstants.NO_NONCE);
197    TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
198    assertFalse(failedProcedure.isWorkerAcquired());
199    // let one procedure finish and release worker
200    testProcedures.get(0).countDown();
201    TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired());
202    assertTrue(testProcedures.get(0).isSuccess());
203  }
204
205  @Test
206  public void testGetWALsToSplit() throws Exception {
207    TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
208    // load table
209    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
210    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
211    List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true);
212    assertEquals(1, metaWals.size());
213    List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false);
214    assertEquals(1, wals.size());
215    ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
216      .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
217      .get();
218    metaWals = splitWALManager.getWALsToSplit(testServer, true);
219    assertEquals(0, metaWals.size());
220  }
221
222  private void splitLogsTestHelper(HBaseTestingUtil testUtil) throws Exception {
223    HMaster hmaster = testUtil.getHBaseCluster().getMaster();
224    SplitWALManager splitWALManager = hmaster.getSplitWALManager();
225    LOG.info(
226      "The Master FS is pointing to: " + hmaster.getMasterFileSystem().getFileSystem().getUri());
227    LOG.info(
228      "The WAL FS is pointing to: " + hmaster.getMasterFileSystem().getWALFileSystem().getUri());
229
230    testUtil.createTable(TABLE_NAME, FAMILY, testUtil.KEYS_FOR_HBA_CREATE_TABLE);
231    // load table
232    testUtil.loadTable(testUtil.getConnection().getTable(TABLE_NAME), FAMILY);
233    ProcedureExecutor<MasterProcedureEnv> masterPE = hmaster.getMasterProcedureExecutor();
234    ServerName metaServer = testUtil.getHBaseCluster().getServerHoldingMeta();
235    ServerName testServer = testUtil.getHBaseCluster().getRegionServerThreads().stream()
236      .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
237      .get();
238    List<Procedure> procedures = splitWALManager.splitWALs(testServer, false);
239    assertEquals(1, procedures.size());
240    ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
241    assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size());
242
243    // Validate the old WAL file archive dir
244    Path walRootDir = hmaster.getMasterFileSystem().getWALRootDir();
245    Path walArchivePath = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
246    FileSystem walFS = hmaster.getMasterFileSystem().getWALFileSystem();
247    int archiveFileCount = walFS.listStatus(walArchivePath).length;
248
249    procedures = splitWALManager.splitWALs(metaServer, true);
250    assertEquals(1, procedures.size());
251    ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
252    assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size());
253    assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size());
254    // There should be archiveFileCount + 1 WALs after SplitWALProcedure finish
255    assertEquals(archiveFileCount + 1, walFS.listStatus(walArchivePath).length,
256      "Splitted WAL files should be archived");
257  }
258
259  @Test
260  public void testSplitLogs() throws Exception {
261    splitLogsTestHelper(TEST_UTIL);
262  }
263
264  @Test
265  public void testSplitLogsWithDifferentWalAndRootFS() throws Exception {
266    HBaseTestingUtil testUtil2 = new HBaseTestingUtil();
267    testUtil2.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false);
268    testUtil2.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
269    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testWalDir");
270    testUtil2.getConfiguration().set(CommonFSUtils.HBASE_WAL_DIR, dir.toString());
271    CommonFSUtils.setWALRootDir(testUtil2.getConfiguration(), dir);
272    testUtil2.startMiniCluster(3);
273    splitLogsTestHelper(testUtil2);
274    testUtil2.shutdownMiniCluster();
275  }
276
277  @Test
278  public void testWorkerReloadWhenMasterRestart() throws Exception {
279    List<FakeServerProcedure> testProcedures = new ArrayList<>();
280    for (int i = 0; i < 3; i++) {
281      FakeServerProcedure procedure =
282        new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
283      testProcedures.add(procedure);
284      ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure,
285        HConstants.NO_NONCE, HConstants.NO_NONCE);
286    }
287    TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
288    // Kill master
289    TEST_UTIL.getHBaseCluster().killMaster(master.getServerName());
290    TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000);
291    // restart master
292    TEST_UTIL.getHBaseCluster().startMaster();
293    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
294    this.master = TEST_UTIL.getHBaseCluster().getMaster();
295
296    FakeServerProcedure failedProcedure =
297      new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
298    ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure,
299      HConstants.NO_NONCE, HConstants.NO_NONCE);
300    TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
301    assertFalse(failedProcedure.isWorkerAcquired());
302    for (int i = 0; i < 3; i++) {
303      testProcedures.get(i).countDown();
304    }
305    failedProcedure.countDown();
306  }
307
308  public static final class FakeServerProcedure
309    extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState>
310    implements ServerProcedureInterface {
311
312    private ServerName serverName;
313    private volatile ServerName worker;
314    private CountDownLatch barrier = new CountDownLatch(1);
315    private volatile boolean triedToAcquire = false;
316
317    public FakeServerProcedure() {
318    }
319
320    public FakeServerProcedure(ServerName serverName) {
321      this.serverName = serverName;
322    }
323
324    public ServerName getServerName() {
325      return serverName;
326    }
327
328    @Override
329    public boolean hasMetaTableRegion() {
330      return false;
331    }
332
333    @Override
334    public ServerOperationType getServerOperationType() {
335      return SPLIT_WAL;
336    }
337
338    @Override
339    protected Flow executeFromState(MasterProcedureEnv env,
340      MasterProcedureProtos.SplitWALState state)
341      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
342      SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
343      switch (state) {
344        case ACQUIRE_SPLIT_WAL_WORKER:
345          triedToAcquire = true;
346          worker = splitWALManager.acquireSplitWALWorker(this);
347          setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER);
348          return Flow.HAS_MORE_STATE;
349        case DISPATCH_WAL_TO_WORKER:
350          barrier.await();
351          setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
352          return Flow.HAS_MORE_STATE;
353        case RELEASE_SPLIT_WORKER:
354          splitWALManager.releaseSplitWALWorker(worker);
355          return Flow.NO_MORE_STATE;
356        default:
357          throw new UnsupportedOperationException("unhandled state=" + state);
358      }
359    }
360
361    public boolean isWorkerAcquired() {
362      return worker != null;
363    }
364
365    public boolean isTriedToAcquire() {
366      return triedToAcquire;
367    }
368
369    public void countDown() {
370      this.barrier.countDown();
371    }
372
373    @Override
374    protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state)
375      throws IOException, InterruptedException {
376
377    }
378
379    @Override
380    protected MasterProcedureProtos.SplitWALState getState(int stateId) {
381      return MasterProcedureProtos.SplitWALState.forNumber(stateId);
382    }
383
384    @Override
385    protected int getStateId(MasterProcedureProtos.SplitWALState state) {
386      return state.getNumber();
387    }
388
389    @Override
390    protected MasterProcedureProtos.SplitWALState getInitialState() {
391      return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER;
392    }
393
394    @Override
395    protected boolean holdLock(MasterProcedureEnv env) {
396      return true;
397    }
398
399    @Override
400    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
401
402    }
403
404    @Override
405    protected boolean abort(MasterProcedureEnv env) {
406      return false;
407    }
408
409    @Override
410    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
411      MasterProcedureProtos.SplitWALData.Builder builder =
412        MasterProcedureProtos.SplitWALData.newBuilder();
413      builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName));
414      serializer.serialize(builder.build());
415    }
416
417    @Override
418    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
419      MasterProcedureProtos.SplitWALData data =
420        serializer.deserialize(MasterProcedureProtos.SplitWALData.class);
421      serverName = ProtobufUtil.toServerName(data.getCrashedServer());
422    }
423  }
424}