001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.util.concurrent.locks.Lock;
023
024import org.apache.hadoop.hbase.executor.EventType;
025import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
026import org.apache.hadoop.hbase.util.KeyLocker;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
032import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
033
034/**
035 * This callable is used to do the real split WAL task. It is called by
036 * {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} from master and executed
037 * by executor service which is in charge of executing the events of EventType.RS_LOG_REPLAY
038 *
039 * When execute this callable, it will call SplitLogWorker.splitLog() to split the WAL.
040 * If the return value is SplitLogWorker.TaskExecutor.Status.DONE, it means the task is successful
041 * and it will return null to end the call. Otherwise it will throw an exception and let
042 * {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} to handle this problem.
043 *
044 * This class is to replace the zk-based WAL splitting related code, {@link SplitLogWorker},
045 * {@link org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination} and
046 * {@link org.apache.hadoop.hbase.coordination.ZkSplitLogWorkerCoordination} can be removed after
047 * we switch to procedure-based WAL splitting.
048 */
049@InterfaceAudience.Private
050public class SplitWALCallable implements RSProcedureCallable {
051  private static final Logger LOG = LoggerFactory.getLogger(SplitWALCallable.class);
052
053  private String walPath;
054  private Exception initError;
055  private HRegionServer rs;
056  private final KeyLocker<String> splitWALLocks = new KeyLocker<>();
057  private volatile Lock splitWALLock = null;
058
059
060  @Override
061  public void init(byte[] parameter, HRegionServer rs) {
062    try {
063      this.rs = rs;
064      MasterProcedureProtos.SplitWALParameter param =
065          MasterProcedureProtos.SplitWALParameter.parseFrom(parameter);
066      this.walPath = param.getWalPath();
067    } catch (InvalidProtocolBufferException e) {
068      LOG.error("parse proto buffer of split WAL request failed ", e);
069      initError = e;
070    }
071  }
072
073  @Override
074  public EventType getEventType() {
075    return EventType.RS_LOG_REPLAY;
076  }
077
078  @Override
079  public Void call() throws Exception {
080    if (initError != null) {
081      throw initError;
082    }
083    //grab a lock
084    splitWALLock = splitWALLocks.acquireLock(walPath);
085    try{
086      splitWal();
087      LOG.info("split WAL {} succeed.", walPath);
088    } catch (IOException e){
089      LOG.warn("failed to split WAL {}.", walPath, e);
090      throw e;
091    }
092    finally {
093      splitWALLock.unlock();
094    }
095    return null;
096  }
097
098  public String getWalPath() {
099    return this.walPath;
100  }
101
102  private void splitWal() throws IOException {
103    SplitLogWorker.TaskExecutor.Status status =
104        SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.walFactory);
105    if (status != SplitLogWorker.TaskExecutor.Status.DONE) {
106      throw new IOException("Split WAL " + walPath + " failed at server ");
107    }
108  }
109}