001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
022import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.concurrent.CountDownLatch;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
035import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
036import org.apache.hadoop.hbase.procedure2.Procedure;
037import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
038import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
039import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
040import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
041import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
042import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.testclassification.MasterTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.JVMClusterUtil;
047import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
048import org.junit.After;
049import org.junit.Assert;
050import org.junit.Before;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
055import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
057
058@Category({ MasterTests.class, LargeTests.class })
059
060public class TestSplitWALManager {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064      HBaseClassTestRule.forClass(TestSplitWALManager.class);
065
066  private static HBaseTestingUtility TEST_UTIL;
067  private HMaster master;
068  private SplitWALManager splitWALManager;
069  private TableName TABLE_NAME;
070  private byte[] FAMILY;
071
072  @Before
073  public void setup() throws Exception {
074    TEST_UTIL = new HBaseTestingUtility();
075    TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false);
076    TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
077    TEST_UTIL.startMiniCluster(3);
078    master = TEST_UTIL.getHBaseCluster().getMaster();
079    splitWALManager = master.getSplitWALManager();
080    TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager"));
081    FAMILY = Bytes.toBytes("test");
082  }
083
084  @After
085  public void teardown() throws Exception {
086    TEST_UTIL.shutdownMiniCluster();
087  }
088
089  @Test
090  public void testAcquireAndRelease() throws Exception {
091    List<FakeServerProcedure> testProcedures = new ArrayList<>();
092    for (int i = 0; i < 4; i++) {
093      testProcedures.add(new FakeServerProcedure(
094          TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
095    }
096    ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
097    Assert.assertNotNull(server);
098    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
099    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
100
101    Exception e = null;
102    try {
103      splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
104    } catch (ProcedureSuspendedException suspendException) {
105      e = suspendException;
106    }
107    Assert.assertNotNull(e);
108    Assert.assertTrue(e instanceof ProcedureSuspendedException);
109
110    splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster()
111        .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
112    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
113  }
114
115  @Test
116  public void testAddNewServer() throws Exception {
117    List<FakeServerProcedure> testProcedures = new ArrayList<>();
118    for (int i = 0; i < 4; i++) {
119      testProcedures.add(new FakeServerProcedure(
120          TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
121    }
122    ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
123    Assert.assertNotNull(server);
124    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
125    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
126
127    Exception e = null;
128    try {
129      splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
130    } catch (ProcedureSuspendedException suspendException) {
131      e = suspendException;
132    }
133    Assert.assertNotNull(e);
134    Assert.assertTrue(e instanceof ProcedureSuspendedException);
135
136    JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer();
137    newServer.waitForServerOnline();
138    Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
139  }
140
141  @Test
142  public void testCreateSplitWALProcedures() throws Exception {
143    TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
144    // load table
145    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
146    ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
147    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
148    Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
149        AbstractFSWALProvider.getWALDirectoryName(metaServer.toString()));
150    // Test splitting meta wal
151    FileStatus[] wals =
152        TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER);
153    Assert.assertEquals(1, wals.length);
154    List<Procedure> testProcedures =
155        splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
156    Assert.assertEquals(1, testProcedures.size());
157    ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
158    Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
159
160    // Test splitting wal
161    wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER);
162    Assert.assertEquals(1, wals.length);
163    testProcedures =
164        splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
165    Assert.assertEquals(1, testProcedures.size());
166    ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
167    Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
168  }
169
170  @Test
171  public void testAcquireAndReleaseSplitWALWorker() throws Exception {
172    ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
173    List<FakeServerProcedure> testProcedures = new ArrayList<>();
174    for (int i = 0; i < 3; i++) {
175      FakeServerProcedure procedure =
176          new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
177      testProcedures.add(procedure);
178      ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE,
179          HConstants.NO_NONCE);
180    }
181    TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
182    FakeServerProcedure failedProcedure =
183        new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
184    ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE,
185        HConstants.NO_NONCE);
186    TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
187    Assert.assertFalse(failedProcedure.isWorkerAcquired());
188    // let one procedure finish and release worker
189    testProcedures.get(0).countDown();
190    TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired());
191    Assert.assertTrue(testProcedures.get(0).isSuccess());
192  }
193
194  @Test
195  public void testGetWALsToSplit() throws Exception {
196    TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
197    // load table
198    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
199    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
200    List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true);
201    Assert.assertEquals(1, metaWals.size());
202    List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false);
203    Assert.assertEquals(1, wals.size());
204    ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
205        .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
206        .get();
207    metaWals = splitWALManager.getWALsToSplit(testServer, true);
208    Assert.assertEquals(0, metaWals.size());
209  }
210
211  @Test
212  public void testSplitLogs() throws Exception {
213    TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
214    // load table
215    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
216    ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
217    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
218    ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
219        .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
220        .get();
221    List<Procedure> procedures = splitWALManager.splitWALs(testServer, false);
222    Assert.assertEquals(1, procedures.size());
223    ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
224    Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size());
225
226    procedures = splitWALManager.splitWALs(metaServer, true);
227    Assert.assertEquals(1, procedures.size());
228    ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
229    Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size());
230    Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size());
231  }
232
233  @Test
234  public void testWorkerReloadWhenMasterRestart() throws Exception {
235    List<FakeServerProcedure> testProcedures = new ArrayList<>();
236    for (int i = 0; i < 3; i++) {
237      FakeServerProcedure procedure =
238          new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
239      testProcedures.add(procedure);
240      ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure,
241        HConstants.NO_NONCE, HConstants.NO_NONCE);
242    }
243    TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
244    // Kill master
245    TEST_UTIL.getHBaseCluster().killMaster(master.getServerName());
246    TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000);
247    // restart master
248    TEST_UTIL.getHBaseCluster().startMaster();
249    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
250    this.master = TEST_UTIL.getHBaseCluster().getMaster();
251
252    FakeServerProcedure failedProcedure =
253        new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
254    ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure,
255      HConstants.NO_NONCE, HConstants.NO_NONCE);
256    TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
257    Assert.assertFalse(failedProcedure.isWorkerAcquired());
258    for (int i = 0; i < 3; i++) {
259      testProcedures.get(i).countDown();
260    }
261    failedProcedure.countDown();
262  }
263
264  public static final class FakeServerProcedure
265      extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState>
266      implements ServerProcedureInterface {
267
268    private ServerName serverName;
269    private ServerName worker;
270    private CountDownLatch barrier = new CountDownLatch(1);
271    private boolean triedToAcquire = false;
272
273    public FakeServerProcedure() {
274    }
275
276    public FakeServerProcedure(ServerName serverName) {
277      this.serverName = serverName;
278    }
279
280    public ServerName getServerName() {
281      return serverName;
282    }
283
284    @Override
285    public boolean hasMetaTableRegion() {
286      return false;
287    }
288
289    @Override
290    public ServerOperationType getServerOperationType() {
291      return SPLIT_WAL;
292    }
293
294    @Override
295    protected Flow executeFromState(MasterProcedureEnv env,
296                                    MasterProcedureProtos.SplitWALState state)
297        throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
298      SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
299      switch (state) {
300        case ACQUIRE_SPLIT_WAL_WORKER:
301          triedToAcquire = true;
302          worker = splitWALManager.acquireSplitWALWorker(this);
303          setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER);
304          return Flow.HAS_MORE_STATE;
305        case DISPATCH_WAL_TO_WORKER:
306          barrier.await();
307          setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
308          return Flow.HAS_MORE_STATE;
309        case RELEASE_SPLIT_WORKER:
310          splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
311          return Flow.NO_MORE_STATE;
312        default:
313          throw new UnsupportedOperationException("unhandled state=" + state);
314      }
315    }
316
317    public boolean isWorkerAcquired() {
318      return worker != null;
319    }
320
321    public boolean isTriedToAcquire() {
322      return triedToAcquire;
323    }
324
325    public void countDown() {
326      this.barrier.countDown();
327    }
328
329    @Override
330    protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state)
331        throws IOException, InterruptedException {
332
333    }
334
335    @Override
336    protected MasterProcedureProtos.SplitWALState getState(int stateId) {
337      return MasterProcedureProtos.SplitWALState.forNumber(stateId);
338    }
339
340    @Override
341    protected int getStateId(MasterProcedureProtos.SplitWALState state) {
342      return state.getNumber();
343    }
344
345    @Override
346    protected MasterProcedureProtos.SplitWALState getInitialState() {
347      return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER;
348    }
349
350    @Override
351    protected boolean holdLock(MasterProcedureEnv env) {
352      return true;
353    }
354
355    @Override
356    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
357
358    }
359
360    @Override
361    protected boolean abort(MasterProcedureEnv env) {
362      return false;
363    }
364
365    @Override
366    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
367      MasterProcedureProtos.SplitWALData.Builder builder =
368          MasterProcedureProtos.SplitWALData.newBuilder();
369      builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName));
370      serializer.serialize(builder.build());
371    }
372
373    @Override
374    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
375      MasterProcedureProtos.SplitWALData data =
376          serializer.deserialize(MasterProcedureProtos.SplitWALData.class);
377      serverName = ProtobufUtil.toServerName(data.getCrashedServer());
378    }
379  }
380}