001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021 022import org.apache.hadoop.hbase.ServerName; 023import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; 024import org.apache.hadoop.hbase.procedure2.Procedure; 025import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 026import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 027import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 028import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 029import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034@InterfaceAudience.Private 035/** 036 * This extract the common used methods of procedures which are send to remote servers. Developers 037 * who extends this class only need to override remoteCallBuild() and complete(). This procedure 038 * will help add the operation to {@link RSProcedureDispatcher} 039 * 040 * If adding the operation to dispatcher failed, addOperationToNode will throw 041 * FailedRemoteDispatchException, and this procedure will return null which procedure Executor will 042 * mark this procedure as complete. Thus the upper layer of this procedure must have a way to 043 * check if this procedure really succeed and how to deal with it. 044 * 045 * If sending the operation to remote RS failed, dispatcher will call remoteCallFailed() to 046 * handle this, which actually call remoteOperationDone with the exception. 047 * If the targetServer crashed but this procedure has no response, than dispatcher will call 048 * remoteOperationFailed() to handle this, which also calls remoteOperationDone with the exception. 049 * If the operation is successful, then remoteOperationCompleted will be called and actually calls 050 * the remoteOperationDone without exception. 051 * 052 * In remoteOperationDone, we'll check if the procedure is already get wake up by others. Then 053 * developer could implement complete() based on their own purpose. 054 * 055 * But basic logic is that if operation succeed, set succ to true and do the clean work. 056 * 057 * If operation failed and require to resend it to the same server, leave the succ as false. 058 * 059 * If operation failed and require to resend it to another server, set succ to true and upper layer 060 * should be able to find out this operation not work and send a operation to another server. 061 */ 062public abstract class ServerRemoteProcedure extends Procedure<MasterProcedureEnv> 063 implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName> { 064 protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class); 065 protected ProcedureEvent<?> event; 066 protected ServerName targetServer; 067 protected boolean dispatched; 068 protected boolean succ; 069 070 protected abstract void complete(MasterProcedureEnv env, Throwable error); 071 072 @Override 073 protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) 074 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 075 if (dispatched) { 076 if (succ) { 077 return null; 078 } 079 dispatched = false; 080 } 081 try { 082 env.getRemoteDispatcher().addOperationToNode(targetServer, this); 083 } catch (FailedRemoteDispatchException frde) { 084 LOG.warn("Can not send remote operation {} to {}, this operation will " 085 + "be retried to send to another server", 086 this.getProcId(), targetServer); 087 return null; 088 } 089 dispatched = true; 090 event = new ProcedureEvent<>(this); 091 event.suspendIfNotReady(this); 092 throw new ProcedureSuspendedException(); 093 } 094 095 @Override 096 protected synchronized void completionCleanup(MasterProcedureEnv env) { 097 env.getRemoteDispatcher().removeCompletedOperation(targetServer, this); 098 } 099 100 @Override 101 public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, 102 IOException exception) { 103 remoteOperationDone(env, exception); 104 } 105 106 @Override 107 public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { 108 remoteOperationDone(env, null); 109 } 110 111 @Override 112 public synchronized void remoteOperationFailed(MasterProcedureEnv env, 113 RemoteProcedureException error) { 114 remoteOperationDone(env, error); 115 } 116 117 synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) { 118 if (this.isFinished()) { 119 LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId()); 120 return; 121 } 122 if (event == null) { 123 LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", 124 getProcId()); 125 return; 126 } 127 complete(env, error); 128 event.wake(env.getProcedureScheduler()); 129 event = null; 130 } 131}