001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.CountDownLatch;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
037import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
038import org.apache.hadoop.hbase.procedure2.Procedure;
039import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
040import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
041import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
042import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
043import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
044import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.apache.hadoop.hbase.testclassification.MasterTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.CommonFSUtils;
049import org.apache.hadoop.hbase.util.JVMClusterUtil;
050import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
051import org.junit.After;
052import org.junit.Assert;
053import org.junit.Before;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
061
062import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
064
065@Category({ MasterTests.class, LargeTests.class })
066
067public class TestSplitWALManager {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestSplitWALManager.class);
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestSplitWALManager.class);
074  private static HBaseTestingUtility TEST_UTIL;
075  private HMaster master;
076  private SplitWALManager splitWALManager;
077  private TableName TABLE_NAME;
078  private byte[] FAMILY;
079
080  @Before
081  public void setup() throws Exception {
082    TEST_UTIL = new HBaseTestingUtility();
083    TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false);
084    TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
085    TEST_UTIL.startMiniCluster(3);
086    master = TEST_UTIL.getHBaseCluster().getMaster();
087    splitWALManager = master.getSplitWALManager();
088    TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager"));
089    FAMILY = Bytes.toBytes("test");
090  }
091
092  @After
093  public void teardown() throws Exception {
094    TEST_UTIL.shutdownMiniCluster();
095  }
096
097  @Test
098  public void testAcquireAndRelease() throws Exception {
099    List<FakeServerProcedure> testProcedures = new ArrayList<>();
100    for (int i = 0; i < 4; i++) {
101      testProcedures
102        .add(new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
103    }
104    ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
105    Assert.assertNotNull(server);
106    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
107    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
108
109    Exception e = null;
110    try {
111      splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
112    } catch (ProcedureSuspendedException suspendException) {
113      e = suspendException;
114    }
115    Assert.assertNotNull(e);
116    Assert.assertTrue(e instanceof ProcedureSuspendedException);
117
118    splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster()
119      .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
120    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
121  }
122
123  @Test
124  public void testAddNewServer() throws Exception {
125    List<FakeServerProcedure> testProcedures = new ArrayList<>();
126    for (int i = 0; i < 4; i++) {
127      testProcedures
128        .add(new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
129    }
130    ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
131    Assert.assertNotNull(server);
132    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
133    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
134
135    Exception e = null;
136    try {
137      splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
138    } catch (ProcedureSuspendedException suspendException) {
139      e = suspendException;
140    }
141    Assert.assertNotNull(e);
142    Assert.assertTrue(e instanceof ProcedureSuspendedException);
143
144    JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer();
145    newServer.waitForServerOnline();
146    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
147  }
148
149  @Test
150  public void testCreateSplitWALProcedures() throws Exception {
151    TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
152    // load table
153    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
154    ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
155    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
156    Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
157      AbstractFSWALProvider.getWALDirectoryName(metaServer.toString()));
158    // Test splitting meta wal
159    FileStatus[] wals =
160      TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER);
161    Assert.assertEquals(1, wals.length);
162    List<Procedure> testProcedures =
163      splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
164    Assert.assertEquals(1, testProcedures.size());
165    ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
166    Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
167
168    // Test splitting wal
169    wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER);
170    Assert.assertEquals(1, wals.length);
171    testProcedures =
172      splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
173    Assert.assertEquals(1, testProcedures.size());
174    ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
175    Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
176  }
177
178  @Test
179  public void testAcquireAndReleaseSplitWALWorker() throws Exception {
180    ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
181    List<FakeServerProcedure> testProcedures = new ArrayList<>();
182    for (int i = 0; i < 3; i++) {
183      FakeServerProcedure procedure =
184        new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
185      testProcedures.add(procedure);
186      ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE,
187        HConstants.NO_NONCE);
188    }
189    TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
190    FakeServerProcedure failedProcedure =
191      new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
192    ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE,
193      HConstants.NO_NONCE);
194    TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
195    Assert.assertFalse(failedProcedure.isWorkerAcquired());
196    // let one procedure finish and release worker
197    testProcedures.get(0).countDown();
198    TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired());
199    Assert.assertTrue(testProcedures.get(0).isSuccess());
200  }
201
202  @Test
203  public void testGetWALsToSplit() throws Exception {
204    TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
205    // load table
206    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
207    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
208    List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true);
209    Assert.assertEquals(1, metaWals.size());
210    List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false);
211    Assert.assertEquals(1, wals.size());
212    ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
213      .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
214      .get();
215    metaWals = splitWALManager.getWALsToSplit(testServer, true);
216    Assert.assertEquals(0, metaWals.size());
217  }
218
219  private void splitLogsTestHelper(HBaseTestingUtility testUtil) throws Exception {
220    HMaster hmaster = testUtil.getHBaseCluster().getMaster();
221    SplitWALManager splitWALManager = hmaster.getSplitWALManager();
222    LOG.info(
223      "The Master FS is pointing to: " + hmaster.getMasterFileSystem().getFileSystem().getUri());
224    LOG.info(
225      "The WAL FS is pointing to: " + hmaster.getMasterFileSystem().getWALFileSystem().getUri());
226
227    testUtil.createTable(TABLE_NAME, FAMILY, testUtil.KEYS_FOR_HBA_CREATE_TABLE);
228    // load table
229    testUtil.loadTable(testUtil.getConnection().getTable(TABLE_NAME), FAMILY);
230    ProcedureExecutor<MasterProcedureEnv> masterPE = hmaster.getMasterProcedureExecutor();
231    ServerName metaServer = testUtil.getHBaseCluster().getServerHoldingMeta();
232    ServerName testServer = testUtil.getHBaseCluster().getRegionServerThreads().stream()
233      .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
234      .get();
235    List<Procedure> procedures = splitWALManager.splitWALs(testServer, false);
236    Assert.assertEquals(1, procedures.size());
237    ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
238    Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size());
239
240    // Validate the old WAL file archive dir
241    Path walRootDir = hmaster.getMasterFileSystem().getWALRootDir();
242    Path walArchivePath = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
243    FileSystem walFS = hmaster.getMasterFileSystem().getWALFileSystem();
244    int archiveFileCount = walFS.listStatus(walArchivePath).length;
245
246    procedures = splitWALManager.splitWALs(metaServer, true);
247    Assert.assertEquals(1, procedures.size());
248    ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
249    Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size());
250    Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size());
251    // There should be archiveFileCount + 1 WALs after SplitWALProcedure finish
252    Assert.assertEquals("Splitted WAL files should be archived", archiveFileCount + 1,
253      walFS.listStatus(walArchivePath).length);
254  }
255
256  @Test
257  public void testSplitLogs() throws Exception {
258    splitLogsTestHelper(TEST_UTIL);
259  }
260
261  @Test
262  public void testSplitLogsWithDifferentWalAndRootFS() throws Exception {
263    HBaseTestingUtility testUtil2 = new HBaseTestingUtility();
264    testUtil2.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false);
265    testUtil2.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
266    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testWalDir");
267    testUtil2.getConfiguration().set(CommonFSUtils.HBASE_WAL_DIR, dir.toString());
268    CommonFSUtils.setWALRootDir(testUtil2.getConfiguration(), dir);
269    testUtil2.startMiniCluster(3);
270    splitLogsTestHelper(testUtil2);
271    testUtil2.shutdownMiniCluster();
272  }
273
274  @Test
275  public void testWorkerReloadWhenMasterRestart() throws Exception {
276    List<FakeServerProcedure> testProcedures = new ArrayList<>();
277    for (int i = 0; i < 3; i++) {
278      FakeServerProcedure procedure =
279        new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
280      testProcedures.add(procedure);
281      ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure,
282        HConstants.NO_NONCE, HConstants.NO_NONCE);
283    }
284    TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
285    // Kill master
286    TEST_UTIL.getHBaseCluster().killMaster(master.getServerName());
287    TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000);
288    // restart master
289    TEST_UTIL.getHBaseCluster().startMaster();
290    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
291    this.master = TEST_UTIL.getHBaseCluster().getMaster();
292
293    FakeServerProcedure failedProcedure =
294      new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
295    ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure,
296      HConstants.NO_NONCE, HConstants.NO_NONCE);
297    TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
298    Assert.assertFalse(failedProcedure.isWorkerAcquired());
299    for (int i = 0; i < 3; i++) {
300      testProcedures.get(i).countDown();
301    }
302    failedProcedure.countDown();
303  }
304
305  public static final class FakeServerProcedure
306    extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState>
307    implements ServerProcedureInterface {
308
309    private ServerName serverName;
310    private ServerName worker;
311    private CountDownLatch barrier = new CountDownLatch(1);
312    private boolean triedToAcquire = false;
313
314    public FakeServerProcedure() {
315    }
316
317    public FakeServerProcedure(ServerName serverName) {
318      this.serverName = serverName;
319    }
320
321    public ServerName getServerName() {
322      return serverName;
323    }
324
325    @Override
326    public boolean hasMetaTableRegion() {
327      return false;
328    }
329
330    @Override
331    public ServerOperationType getServerOperationType() {
332      return SPLIT_WAL;
333    }
334
335    @Override
336    protected Flow executeFromState(MasterProcedureEnv env,
337      MasterProcedureProtos.SplitWALState state)
338      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
339      SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
340      switch (state) {
341        case ACQUIRE_SPLIT_WAL_WORKER:
342          triedToAcquire = true;
343          worker = splitWALManager.acquireSplitWALWorker(this);
344          setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER);
345          return Flow.HAS_MORE_STATE;
346        case DISPATCH_WAL_TO_WORKER:
347          barrier.await();
348          setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
349          return Flow.HAS_MORE_STATE;
350        case RELEASE_SPLIT_WORKER:
351          splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
352          return Flow.NO_MORE_STATE;
353        default:
354          throw new UnsupportedOperationException("unhandled state=" + state);
355      }
356    }
357
358    public boolean isWorkerAcquired() {
359      return worker != null;
360    }
361
362    public boolean isTriedToAcquire() {
363      return triedToAcquire;
364    }
365
366    public void countDown() {
367      this.barrier.countDown();
368    }
369
370    @Override
371    protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state)
372      throws IOException, InterruptedException {
373
374    }
375
376    @Override
377    protected MasterProcedureProtos.SplitWALState getState(int stateId) {
378      return MasterProcedureProtos.SplitWALState.forNumber(stateId);
379    }
380
381    @Override
382    protected int getStateId(MasterProcedureProtos.SplitWALState state) {
383      return state.getNumber();
384    }
385
386    @Override
387    protected MasterProcedureProtos.SplitWALState getInitialState() {
388      return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER;
389    }
390
391    @Override
392    protected boolean holdLock(MasterProcedureEnv env) {
393      return true;
394    }
395
396    @Override
397    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
398
399    }
400
401    @Override
402    protected boolean abort(MasterProcedureEnv env) {
403      return false;
404    }
405
406    @Override
407    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
408      MasterProcedureProtos.SplitWALData.Builder builder =
409        MasterProcedureProtos.SplitWALData.newBuilder();
410      builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName));
411      serializer.serialize(builder.build());
412    }
413
414    @Override
415    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
416      MasterProcedureProtos.SplitWALData data =
417        serializer.deserialize(MasterProcedureProtos.SplitWALData.class);
418      serverName = ProtobufUtil.toServerName(data.getCrashedServer());
419    }
420  }
421}