001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021
022import org.apache.hadoop.hbase.ServerName;
023import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
024import org.apache.hadoop.hbase.procedure2.Procedure;
025import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
026import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
027import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
028import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
029import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034@InterfaceAudience.Private
035/**
036 * This extract the common used methods of procedures which are send to remote servers. Developers
037 * who extends this class only need to override remoteCallBuild() and complete(). This procedure
038 * will help add the operation to {@link RSProcedureDispatcher}
039 *
040 * If adding the operation to dispatcher failed, addOperationToNode will throw
041 * FailedRemoteDispatchException, and this procedure will return null which procedure Executor will
042 * mark this procedure as complete. Thus the upper layer of this procedure must have a way to
043 * check if this procedure really succeed and how to deal with it.
044 *
045 * If sending the operation to remote RS failed, dispatcher will call remoteCallFailed() to
046 * handle this, which actually call remoteOperationDone with the exception.
047 * If the targetServer crashed but this procedure has no response, than dispatcher will call
048 * remoteOperationFailed() to handle this, which also calls remoteOperationDone with the exception.
049 * If the operation is successful, then remoteOperationCompleted will be called and actually calls
050 * the remoteOperationDone without exception.
051 *
052 * In remoteOperationDone, we'll check if the procedure is already get wake up by others. Then
053 * developer could implement complete() based on their own purpose.
054 *
055 * But basic logic is that if operation succeed, set succ to true and do the clean work.
056 *
057 * If operation failed and require to resend it to the same server, leave the succ as false.
058 *
059 * If operation failed and require to resend it to another server, set succ to true and upper layer
060 * should be able to find out this operation not work and send a operation to another server.
061 */
062public abstract class ServerRemoteProcedure extends Procedure<MasterProcedureEnv>
063    implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName> {
064  protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class);
065  protected ProcedureEvent<?> event;
066  protected ServerName targetServer;
067  protected boolean dispatched;
068  protected boolean succ;
069
070  protected abstract void complete(MasterProcedureEnv env, Throwable error);
071
072  @Override
073  protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
074      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
075    if (dispatched) {
076      if (succ) {
077        return null;
078      }
079      dispatched = false;
080    }
081    try {
082      env.getRemoteDispatcher().addOperationToNode(targetServer, this);
083    } catch (FailedRemoteDispatchException frde) {
084      LOG.warn("Can not send remote operation {} to {}, this operation will "
085          + "be retried to send to another server",
086        this.getProcId(), targetServer);
087      return null;
088    }
089    dispatched = true;
090    event = new ProcedureEvent<>(this);
091    event.suspendIfNotReady(this);
092    throw new ProcedureSuspendedException();
093  }
094
095  @Override
096  protected synchronized void completionCleanup(MasterProcedureEnv env) {
097    env.getRemoteDispatcher().removeCompletedOperation(targetServer, this);
098  }
099
100  @Override
101  public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
102      IOException exception) {
103    remoteOperationDone(env, exception);
104  }
105
106  @Override
107  public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
108    remoteOperationDone(env, null);
109  }
110
111  @Override
112  public synchronized void remoteOperationFailed(MasterProcedureEnv env,
113      RemoteProcedureException error) {
114    remoteOperationDone(env, error);
115  }
116
117  synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) {
118    if (this.isFinished()) {
119      LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId());
120      return;
121    }
122    if (event == null) {
123      LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
124          getProcId());
125      return;
126    }
127    complete(env, error);
128    event.wake(env.getProcedureScheduler());
129    event = null;
130  }
131}