001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.CountDownLatch;
028
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
037import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
038import org.apache.hadoop.hbase.procedure2.Procedure;
039import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
040import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
041import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
042import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
043import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
044import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
045import org.apache.hadoop.hbase.testclassification.MasterTests;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.JVMClusterUtil;
049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
050import org.junit.After;
051import org.junit.Assert;
052import org.junit.Before;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056
057import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
058import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
060
061@Category({ MasterTests.class, MediumTests.class })
062
063public class TestSplitWALManager {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067      HBaseClassTestRule.forClass(TestSplitWALManager.class);
068
069  private static HBaseTestingUtility TEST_UTIL;
070  private HMaster master;
071  private SplitWALManager splitWALManager;
072  private TableName TABLE_NAME;
073  private byte[] FAMILY;
074
075  @Before
076  public void setup() throws Exception {
077    TEST_UTIL = new HBaseTestingUtility();
078    TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false);
079    TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
080    TEST_UTIL.startMiniCluster(3);
081    master = TEST_UTIL.getHBaseCluster().getMaster();
082    splitWALManager = master.getSplitWALManager();
083    TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager"));
084    FAMILY = Bytes.toBytes("test");
085  }
086
087  @After
088  public void teardown() throws Exception {
089    TEST_UTIL.shutdownMiniCluster();
090  }
091
092  @Test
093  public void testAcquireAndRelease() throws Exception {
094    List<FakeServerProcedure> testProcedures = new ArrayList<>();
095    for (int i = 0; i < 4; i++) {
096      testProcedures.add(new FakeServerProcedure(
097          TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
098    }
099    ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
100    Assert.assertNotNull(server);
101    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
102    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
103
104    Exception e = null;
105    try {
106      splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
107    } catch (ProcedureSuspendedException suspendException) {
108      e = suspendException;
109    }
110    Assert.assertNotNull(e);
111    Assert.assertTrue(e instanceof ProcedureSuspendedException);
112
113    splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster()
114        .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
115    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
116  }
117
118  @Test
119  public void testAddNewServer() throws Exception {
120    List<FakeServerProcedure> testProcedures = new ArrayList<>();
121    for (int i = 0; i < 4; i++) {
122      testProcedures.add(new FakeServerProcedure(
123          TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
124    }
125    ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
126    Assert.assertNotNull(server);
127    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
128    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
129
130    Exception e = null;
131    try {
132      splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
133    } catch (ProcedureSuspendedException suspendException) {
134      e = suspendException;
135    }
136    Assert.assertNotNull(e);
137    Assert.assertTrue(e instanceof ProcedureSuspendedException);
138
139    JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer();
140    newServer.waitForServerOnline();
141    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
142  }
143
144  @Test
145  public void testCreateSplitWALProcedures() throws Exception {
146    TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
147    // load table
148    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
149    ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
150    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
151    Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
152        AbstractFSWALProvider.getWALDirectoryName(metaServer.toString()));
153    // Test splitting meta wal
154    FileStatus[] wals =
155        TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER);
156    Assert.assertEquals(1, wals.length);
157    List<Procedure> testProcedures =
158        splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
159    Assert.assertEquals(1, testProcedures.size());
160    ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
161    Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
162
163    // Test splitting wal
164    wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER);
165    Assert.assertEquals(1, wals.length);
166    testProcedures =
167        splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
168    Assert.assertEquals(1, testProcedures.size());
169    ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
170    Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
171  }
172
173  @Test
174  public void testAcquireAndReleaseSplitWALWorker() throws Exception {
175    ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
176    List<FakeServerProcedure> testProcedures = new ArrayList<>();
177    for (int i = 0; i < 3; i++) {
178      FakeServerProcedure procedure =
179          new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
180      testProcedures.add(procedure);
181      ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE,
182          HConstants.NO_NONCE);
183    }
184    TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
185    FakeServerProcedure failedProcedure =
186        new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
187    ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE,
188        HConstants.NO_NONCE);
189    TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
190    Assert.assertFalse(failedProcedure.isWorkerAcquired());
191    // let one procedure finish and release worker
192    testProcedures.get(0).countDown();
193    TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired());
194    Assert.assertTrue(testProcedures.get(0).isSuccess());
195  }
196
197  @Test
198  public void testGetWALsToSplit() throws Exception {
199    TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
200    // load table
201    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
202    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
203    List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true);
204    Assert.assertEquals(1, metaWals.size());
205    List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false);
206    Assert.assertEquals(1, wals.size());
207    ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
208        .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
209        .get();
210    metaWals = splitWALManager.getWALsToSplit(testServer, true);
211    Assert.assertEquals(0, metaWals.size());
212  }
213
214  @Test
215  public void testSplitLogs() throws Exception {
216    TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
217    // load table
218    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
219    ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
220    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
221    ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
222        .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
223        .get();
224    List<Procedure> procedures = splitWALManager.splitWALs(testServer, false);
225    Assert.assertEquals(1, procedures.size());
226    ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
227    Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size());
228
229    procedures = splitWALManager.splitWALs(metaServer, true);
230    Assert.assertEquals(1, procedures.size());
231    ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
232    Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size());
233    Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size());
234  }
235
236  @Test
237  public void testWorkerReloadWhenMasterRestart() throws Exception {
238    List<FakeServerProcedure> testProcedures = new ArrayList<>();
239    for (int i = 0; i < 3; i++) {
240      FakeServerProcedure procedure =
241          new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
242      testProcedures.add(procedure);
243      ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure,
244        HConstants.NO_NONCE, HConstants.NO_NONCE);
245    }
246    TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
247    // Kill master
248    TEST_UTIL.getHBaseCluster().killMaster(master.getServerName());
249    TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000);
250    // restart master
251    TEST_UTIL.getHBaseCluster().startMaster();
252    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
253    this.master = TEST_UTIL.getHBaseCluster().getMaster();
254
255    FakeServerProcedure failedProcedure =
256        new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
257    ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure,
258      HConstants.NO_NONCE, HConstants.NO_NONCE);
259    TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
260    Assert.assertFalse(failedProcedure.isWorkerAcquired());
261    for (int i = 0; i < 3; i++) {
262      testProcedures.get(i).countDown();
263    }
264    failedProcedure.countDown();
265  }
266
267  public static final class FakeServerProcedure
268      extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState>
269      implements ServerProcedureInterface {
270
271    private ServerName serverName;
272    private ServerName worker;
273    private CountDownLatch barrier = new CountDownLatch(1);
274    private boolean triedToAcquire = false;
275
276    public FakeServerProcedure() {
277    }
278
279    public FakeServerProcedure(ServerName serverName) {
280      this.serverName = serverName;
281    }
282
283    public ServerName getServerName() {
284      return serverName;
285    }
286
287    @Override
288    public boolean hasMetaTableRegion() {
289      return false;
290    }
291
292    @Override
293    public ServerOperationType getServerOperationType() {
294      return SPLIT_WAL;
295    }
296
297    @Override
298    protected Flow executeFromState(MasterProcedureEnv env,
299                                    MasterProcedureProtos.SplitWALState state)
300        throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
301      SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
302      switch (state) {
303        case ACQUIRE_SPLIT_WAL_WORKER:
304          triedToAcquire = true;
305          worker = splitWALManager.acquireSplitWALWorker(this);
306          setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER);
307          return Flow.HAS_MORE_STATE;
308        case DISPATCH_WAL_TO_WORKER:
309          barrier.await();
310          setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
311          return Flow.HAS_MORE_STATE;
312        case RELEASE_SPLIT_WORKER:
313          splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
314          return Flow.NO_MORE_STATE;
315        default:
316          throw new UnsupportedOperationException("unhandled state=" + state);
317      }
318    }
319
320    public boolean isWorkerAcquired() {
321      return worker != null;
322    }
323
324    public boolean isTriedToAcquire() {
325      return triedToAcquire;
326    }
327
328    public void countDown() {
329      this.barrier.countDown();
330    }
331
332    @Override
333    protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state)
334        throws IOException, InterruptedException {
335
336    }
337
338    @Override
339    protected MasterProcedureProtos.SplitWALState getState(int stateId) {
340      return MasterProcedureProtos.SplitWALState.forNumber(stateId);
341    }
342
343    @Override
344    protected int getStateId(MasterProcedureProtos.SplitWALState state) {
345      return state.getNumber();
346    }
347
348    @Override
349    protected MasterProcedureProtos.SplitWALState getInitialState() {
350      return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER;
351    }
352
353    @Override
354    protected boolean holdLock(MasterProcedureEnv env) {
355      return true;
356    }
357
358    @Override
359    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
360
361    }
362
363    @Override
364    protected boolean abort(MasterProcedureEnv env) {
365      return false;
366    }
367
368    @Override
369    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
370      MasterProcedureProtos.SplitWALData.Builder builder =
371          MasterProcedureProtos.SplitWALData.newBuilder();
372      builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName));
373      serializer.serialize(builder.build());
374    }
375
376    @Override
377    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
378      MasterProcedureProtos.SplitWALData data =
379          serializer.deserialize(MasterProcedureProtos.SplitWALData.class);
380      serverName = ProtobufUtil.toServerName(data.getCrashedServer());
381    }
382  }
383}