001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019import java.util.concurrent.locks.Lock;
020import org.apache.hadoop.hbase.HBaseIOException;
021import org.apache.hadoop.hbase.executor.EventType;
022import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
023import org.apache.hadoop.hbase.util.KeyLocker;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
028import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
029
030/**
031 * This callable is used to do the real split WAL task. It is called by
032 * {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} from master and executed
033 * by executor service which is in charge of executing the events of EventType.RS_LOG_REPLAY
034 *
035 * When execute this callable, it will call SplitLogWorker.splitLog() to split the WAL.
036 * If the return value is SplitLogWorker.TaskExecutor.Status.DONE, it means the task is successful
037 * and it will return null to end the call. Otherwise it will throw an exception and let
038 * {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} to handle this problem.
039 *
040 * This class is to replace the zk-based WAL splitting related code, {@link SplitLogWorker},
041 * {@link org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination} and
042 * {@link org.apache.hadoop.hbase.coordination.ZkSplitLogWorkerCoordination} can be removed after
043 * we switch to procedure-based WAL splitting.
044 */
045@InterfaceAudience.Private
046public class SplitWALCallable implements RSProcedureCallable {
047  private static final Logger LOG = LoggerFactory.getLogger(SplitWALCallable.class);
048
049  private String walPath;
050  private Exception initError;
051  private HRegionServer rs;
052  private final KeyLocker<String> splitWALLocks = new KeyLocker<>();
053  private volatile Lock splitWALLock = null;
054
055
056  @Override
057  public void init(byte[] parameter, HRegionServer rs) {
058    try {
059      this.rs = rs;
060      MasterProcedureProtos.SplitWALParameter param =
061          MasterProcedureProtos.SplitWALParameter.parseFrom(parameter);
062      this.walPath = param.getWalPath();
063    } catch (InvalidProtocolBufferException e) {
064      LOG.error("Parse proto buffer of split WAL request failed ", e);
065      initError = e;
066    }
067  }
068
069  @Override
070  public EventType getEventType() {
071    return EventType.RS_LOG_REPLAY;
072  }
073
074  public static class PreemptedWALSplitException extends HBaseIOException {
075    PreemptedWALSplitException(String wal) {
076      super(wal);
077    }
078  }
079
080  public static class ResignedWALSplitException extends HBaseIOException {
081    ResignedWALSplitException(String wal) {
082      super(wal);
083    }
084  }
085
086  public static class ErrorWALSplitException extends HBaseIOException {
087    ErrorWALSplitException(String wal) {
088      super(wal);
089    }
090  }
091
092  @Override
093  public Void call() throws Exception {
094    if (initError != null) {
095      throw initError;
096    }
097    //grab a lock
098    splitWALLock = splitWALLocks.acquireLock(walPath);
099    try {
100      switch (SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.getWalFactory())) {
101        case DONE:
102          break;
103        case PREEMPTED:
104          throw new PreemptedWALSplitException(this.walPath);
105        case RESIGNED:
106          throw new ResignedWALSplitException(this.walPath);
107        default:
108          throw new ErrorWALSplitException(this.walPath);
109      }
110    } finally {
111      splitWALLock.unlock();
112    }
113    return null;
114  }
115
116  public String getWalPath() {
117    return this.walPath;
118  }
119}