001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertThrows;
025import static org.junit.Assert.assertTrue;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.CountDownLatch;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
040import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
041import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
042import org.apache.hadoop.hbase.procedure2.Procedure;
043import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
044import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
045import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
046import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
047import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
048import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.testclassification.MasterTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.CommonFSUtils;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.JVMClusterUtil;
055import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
056import org.junit.After;
057import org.junit.Before;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
065
066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
068
069@Category({ MasterTests.class, LargeTests.class })
070public class TestSplitWALManager {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestSplitWALManager.class);
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestSplitWALManager.class);
077  private static HBaseTestingUtil TEST_UTIL;
078  private HMaster master;
079  private SplitWALManager splitWALManager;
080  private TableName TABLE_NAME;
081  private byte[] FAMILY;
082
083  @Before
084  public void setUp() throws Exception {
085    TEST_UTIL = new HBaseTestingUtil();
086    TEST_UTIL.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false);
087    TEST_UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 5);
088    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
089    TEST_UTIL.startMiniCluster(3);
090    master = TEST_UTIL.getHBaseCluster().getMaster();
091    splitWALManager = master.getSplitWALManager();
092    TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager"));
093    FAMILY = Bytes.toBytes("test");
094  }
095
096  @After
097  public void tearDown() throws Exception {
098    TEST_UTIL.shutdownMiniCluster();
099  }
100
101  @Test
102  public void testAcquireAndRelease() throws Exception {
103    List<FakeServerProcedure> testProcedures = new ArrayList<>();
104    for (int i = 0; i < 4; i++) {
105      testProcedures.add(new FakeServerProcedure(
106        ServerName.valueOf("server" + i, 12345, EnvironmentEdgeManager.currentTime())));
107    }
108    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
109    procExec.submitProcedure(testProcedures.get(0));
110    TEST_UTIL.waitFor(10000, () -> testProcedures.get(0).isWorkerAcquired());
111    procExec.submitProcedure(testProcedures.get(1));
112    procExec.submitProcedure(testProcedures.get(2));
113    TEST_UTIL.waitFor(10000,
114      () -> testProcedures.get(1).isWorkerAcquired() && testProcedures.get(2).isWorkerAcquired());
115
116    // should get a ProcedureSuspendedException, so it will try to acquire but can not get a worker
117    procExec.submitProcedure(testProcedures.get(3));
118    TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isTriedToAcquire());
119    for (int i = 0; i < 3; i++) {
120      Thread.sleep(1000);
121      assertFalse(testProcedures.get(3).isWorkerAcquired());
122    }
123
124    // release a worker, the last procedure should be able to get a worker
125    testProcedures.get(0).countDown();
126    TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isWorkerAcquired());
127
128    for (int i = 1; i < 4; i++) {
129      testProcedures.get(i).countDown();
130    }
131    for (int i = 0; i < 4; i++) {
132      final int index = i;
133      TEST_UTIL.waitFor(10000, () -> testProcedures.get(index).isFinished());
134    }
135  }
136
137  @Test
138  public void testAddNewServer() throws Exception {
139    List<FakeServerProcedure> testProcedures = new ArrayList<>();
140    for (int i = 0; i < 4; i++) {
141      testProcedures.add(
142        new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(1).getServerName()));
143    }
144    ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
145    assertNotNull(server);
146    assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
147    assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
148
149    assertThrows(ProcedureSuspendedException.class,
150      () -> splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
151
152    JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer();
153    newServer.waitForServerOnline();
154    assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
155  }
156
157  @Test
158  public void testCreateSplitWALProcedures() throws Exception {
159    TEST_UTIL.createTable(TABLE_NAME, FAMILY, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
160    // load table
161    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
162    ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
163    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
164    Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
165      AbstractFSWALProvider.getWALDirectoryName(metaServer.toString()));
166    // Test splitting meta wal
167    FileStatus[] wals =
168      TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER);
169    assertEquals(1, wals.length);
170    List<Procedure> testProcedures =
171      splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
172    assertEquals(1, testProcedures.size());
173    ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
174    assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
175
176    // Test splitting wal
177    wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER);
178    assertEquals(1, wals.length);
179    testProcedures =
180      splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
181    assertEquals(1, testProcedures.size());
182    ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
183    assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
184  }
185
186  @Test
187  public void testAcquireAndReleaseSplitWALWorker() throws Exception {
188    ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
189    List<FakeServerProcedure> testProcedures = new ArrayList<>();
190    for (int i = 0; i < 3; i++) {
191      FakeServerProcedure procedure =
192        new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
193      testProcedures.add(procedure);
194      ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE,
195        HConstants.NO_NONCE);
196    }
197    TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
198    FakeServerProcedure failedProcedure =
199      new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
200    ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE,
201      HConstants.NO_NONCE);
202    TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
203    assertFalse(failedProcedure.isWorkerAcquired());
204    // let one procedure finish and release worker
205    testProcedures.get(0).countDown();
206    TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired());
207    assertTrue(testProcedures.get(0).isSuccess());
208  }
209
210  @Test
211  public void testGetWALsToSplit() throws Exception {
212    TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
213    // load table
214    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
215    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
216    List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true);
217    assertEquals(1, metaWals.size());
218    List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false);
219    assertEquals(1, wals.size());
220    ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
221      .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
222      .get();
223    metaWals = splitWALManager.getWALsToSplit(testServer, true);
224    assertEquals(0, metaWals.size());
225  }
226
227  private void splitLogsTestHelper(HBaseTestingUtil testUtil) throws Exception {
228    HMaster hmaster = testUtil.getHBaseCluster().getMaster();
229    SplitWALManager splitWALManager = hmaster.getSplitWALManager();
230    LOG.info(
231      "The Master FS is pointing to: " + hmaster.getMasterFileSystem().getFileSystem().getUri());
232    LOG.info(
233      "The WAL FS is pointing to: " + hmaster.getMasterFileSystem().getWALFileSystem().getUri());
234
235    testUtil.createTable(TABLE_NAME, FAMILY, testUtil.KEYS_FOR_HBA_CREATE_TABLE);
236    // load table
237    testUtil.loadTable(testUtil.getConnection().getTable(TABLE_NAME), FAMILY);
238    ProcedureExecutor<MasterProcedureEnv> masterPE = hmaster.getMasterProcedureExecutor();
239    ServerName metaServer = testUtil.getHBaseCluster().getServerHoldingMeta();
240    ServerName testServer = testUtil.getHBaseCluster().getRegionServerThreads().stream()
241      .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
242      .get();
243    List<Procedure> procedures = splitWALManager.splitWALs(testServer, false);
244    assertEquals(1, procedures.size());
245    ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
246    assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size());
247
248    // Validate the old WAL file archive dir
249    Path walRootDir = hmaster.getMasterFileSystem().getWALRootDir();
250    Path walArchivePath = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
251    FileSystem walFS = hmaster.getMasterFileSystem().getWALFileSystem();
252    int archiveFileCount = walFS.listStatus(walArchivePath).length;
253
254    procedures = splitWALManager.splitWALs(metaServer, true);
255    assertEquals(1, procedures.size());
256    ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
257    assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size());
258    assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size());
259    // There should be archiveFileCount + 1 WALs after SplitWALProcedure finish
260    assertEquals("Splitted WAL files should be archived", archiveFileCount + 1,
261      walFS.listStatus(walArchivePath).length);
262  }
263
264  @Test
265  public void testSplitLogs() throws Exception {
266    splitLogsTestHelper(TEST_UTIL);
267  }
268
269  @Test
270  public void testSplitLogsWithDifferentWalAndRootFS() throws Exception {
271    HBaseTestingUtil testUtil2 = new HBaseTestingUtil();
272    testUtil2.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false);
273    testUtil2.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
274    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testWalDir");
275    testUtil2.getConfiguration().set(CommonFSUtils.HBASE_WAL_DIR, dir.toString());
276    CommonFSUtils.setWALRootDir(testUtil2.getConfiguration(), dir);
277    testUtil2.startMiniCluster(3);
278    splitLogsTestHelper(testUtil2);
279    testUtil2.shutdownMiniCluster();
280  }
281
282  @Test
283  public void testWorkerReloadWhenMasterRestart() throws Exception {
284    List<FakeServerProcedure> testProcedures = new ArrayList<>();
285    for (int i = 0; i < 3; i++) {
286      FakeServerProcedure procedure =
287        new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
288      testProcedures.add(procedure);
289      ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure,
290        HConstants.NO_NONCE, HConstants.NO_NONCE);
291    }
292    TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
293    // Kill master
294    TEST_UTIL.getHBaseCluster().killMaster(master.getServerName());
295    TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000);
296    // restart master
297    TEST_UTIL.getHBaseCluster().startMaster();
298    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
299    this.master = TEST_UTIL.getHBaseCluster().getMaster();
300
301    FakeServerProcedure failedProcedure =
302      new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
303    ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure,
304      HConstants.NO_NONCE, HConstants.NO_NONCE);
305    TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
306    assertFalse(failedProcedure.isWorkerAcquired());
307    for (int i = 0; i < 3; i++) {
308      testProcedures.get(i).countDown();
309    }
310    failedProcedure.countDown();
311  }
312
313  public static final class FakeServerProcedure
314    extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState>
315    implements ServerProcedureInterface {
316
317    private ServerName serverName;
318    private volatile ServerName worker;
319    private CountDownLatch barrier = new CountDownLatch(1);
320    private volatile boolean triedToAcquire = false;
321
322    public FakeServerProcedure() {
323    }
324
325    public FakeServerProcedure(ServerName serverName) {
326      this.serverName = serverName;
327    }
328
329    public ServerName getServerName() {
330      return serverName;
331    }
332
333    @Override
334    public boolean hasMetaTableRegion() {
335      return false;
336    }
337
338    @Override
339    public ServerOperationType getServerOperationType() {
340      return SPLIT_WAL;
341    }
342
343    @Override
344    protected Flow executeFromState(MasterProcedureEnv env,
345      MasterProcedureProtos.SplitWALState state)
346      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
347      SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
348      switch (state) {
349        case ACQUIRE_SPLIT_WAL_WORKER:
350          triedToAcquire = true;
351          worker = splitWALManager.acquireSplitWALWorker(this);
352          setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER);
353          return Flow.HAS_MORE_STATE;
354        case DISPATCH_WAL_TO_WORKER:
355          barrier.await();
356          setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
357          return Flow.HAS_MORE_STATE;
358        case RELEASE_SPLIT_WORKER:
359          splitWALManager.releaseSplitWALWorker(worker);
360          return Flow.NO_MORE_STATE;
361        default:
362          throw new UnsupportedOperationException("unhandled state=" + state);
363      }
364    }
365
366    public boolean isWorkerAcquired() {
367      return worker != null;
368    }
369
370    public boolean isTriedToAcquire() {
371      return triedToAcquire;
372    }
373
374    public void countDown() {
375      this.barrier.countDown();
376    }
377
378    @Override
379    protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state)
380      throws IOException, InterruptedException {
381
382    }
383
384    @Override
385    protected MasterProcedureProtos.SplitWALState getState(int stateId) {
386      return MasterProcedureProtos.SplitWALState.forNumber(stateId);
387    }
388
389    @Override
390    protected int getStateId(MasterProcedureProtos.SplitWALState state) {
391      return state.getNumber();
392    }
393
394    @Override
395    protected MasterProcedureProtos.SplitWALState getInitialState() {
396      return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER;
397    }
398
399    @Override
400    protected boolean holdLock(MasterProcedureEnv env) {
401      return true;
402    }
403
404    @Override
405    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
406
407    }
408
409    @Override
410    protected boolean abort(MasterProcedureEnv env) {
411      return false;
412    }
413
414    @Override
415    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
416      MasterProcedureProtos.SplitWALData.Builder builder =
417        MasterProcedureProtos.SplitWALData.newBuilder();
418      builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName));
419      serializer.serialize(builder.build());
420    }
421
422    @Override
423    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
424      MasterProcedureProtos.SplitWALData data =
425        serializer.deserialize(MasterProcedureProtos.SplitWALData.class);
426      serverName = ProtobufUtil.toServerName(data.getCrashedServer());
427    }
428  }
429}