001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021 022import org.apache.hadoop.hbase.ServerName; 023import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; 024import org.apache.hadoop.hbase.procedure2.Procedure; 025import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 026import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 027import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 028import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 029import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034@InterfaceAudience.Private 035/** 036 * The base class for Procedures that run {@link java.util.concurrent.Callable}s on a (remote) 037 * RegionServer; e.g. asking a RegionServer to split a WAL file as a sub-procedure of 038 * the ServerCrashProcedure recovery process. 039 * 040 * <p>To implement a new Procedure type, extend this class and override remoteCallBuild() and 041 * complete(). The dispatch and callback will be handled for you here, internally. 042 * 043 * <p>The Procedure works as follows. It uses {@link RSProcedureDispatcher}, the same system 044 * used dispatching Region OPEN and CLOSE RPCs, to pass a Callable to a RegionServer. Examples 045 * include {@link org.apache.hadoop.hbase.regionserver.SplitWALCallable} and 046 * {@link org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable}. Rather than 047 * assign/unassign, the Master calls #executeProcedures against the remote RegionServer wrapping 048 * a Callable in a {@link ExecuteProceduresRequest}. Upon successful dispatch, 049 * the Procedure then suspends itself on the Master-side and relinqushes its executor worker. 050 * On receipt, the RegionServer submits the Callable to its executor service. When the Callable 051 * completes, it adds itself to a queue on the RegionServer side for processing by a background 052 * thread, the {@link RemoteProcedureResultReporter}. It picks up the completed Callable from the 053 * queue and RPCs the master at #reportProcedureDone with the procedure id and whether success or 054 * failure. The master calls complete() setting success or failure state and then reschedules the 055 * suspended Procedure so it can finish. 056 * 057 * <p>Here are some details on operation: 058 * <p>If adding the operation to the dispatcher fails, addOperationToNode will throw 059 * FailedRemoteDispatchException, and this Procedure will return 'null'. The Procedure Executor 060 * will then mark this procedure as 'complete' (though we failed to dispatch our task). In this 061 * case, the upper layer of this procedure must have a way to check if this Procedure really 062 * succeeded or not and have appropriate handling. 063 * 064 * <p>If sending the operation to remote RS failed, dispatcher will call remoteCallFailed() to 065 * handle this which calls remoteOperationDone with the exception. If the targetServer crashed but 066 * this procedure has no response, than dispatcher will call remoteOperationFailed() which also 067 * calls remoteOperationDone with the exception. If the operation is successful, then 068 * remoteOperationCompleted will be called and actually calls the remoteOperationDone without 069 * exception. 070 * 071 * In remoteOperationDone, we'll check if the procedure is already get wake up by others. Then 072 * developer could implement complete() based on their own purpose. 073 * 074 * But basic logic is that if operation succeed, set succ to true and do the clean work. 075 * 076 * If operation failed and require to resend it to the same server, leave the succ as false. 077 * 078 * If operation failed and require to resend it to another server, set succ to true and upper layer 079 * should be able to find out this operation not work and send a operation to another server. 080 */ 081public abstract class ServerRemoteProcedure extends Procedure<MasterProcedureEnv> 082 implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName> { 083 protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class); 084 protected ProcedureEvent<?> event; 085 protected ServerName targetServer; 086 protected boolean dispatched; 087 protected boolean succ; 088 089 protected abstract void complete(MasterProcedureEnv env, Throwable error); 090 091 @Override 092 protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) 093 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 094 if (dispatched) { 095 if (succ) { 096 return null; 097 } 098 dispatched = false; 099 } 100 try { 101 env.getRemoteDispatcher().addOperationToNode(targetServer, this); 102 } catch (FailedRemoteDispatchException frde) { 103 LOG.warn("Can not send remote operation {} to {}, this operation will " 104 + "be retried to send to another server", 105 this.getProcId(), targetServer); 106 return null; 107 } 108 dispatched = true; 109 event = new ProcedureEvent<>(this); 110 event.suspendIfNotReady(this); 111 throw new ProcedureSuspendedException(); 112 } 113 114 @Override 115 protected synchronized void completionCleanup(MasterProcedureEnv env) { 116 env.getRemoteDispatcher().removeCompletedOperation(targetServer, this); 117 } 118 119 @Override 120 public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, 121 IOException exception) { 122 remoteOperationDone(env, exception); 123 } 124 125 @Override 126 public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { 127 remoteOperationDone(env, null); 128 } 129 130 @Override 131 public synchronized void remoteOperationFailed(MasterProcedureEnv env, 132 RemoteProcedureException error) { 133 remoteOperationDone(env, error); 134 } 135 136 synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) { 137 if (this.isFinished()) { 138 LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId()); 139 return; 140 } 141 if (event == null) { 142 LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", 143 getProcId()); 144 return; 145 } 146 complete(env, error); 147 event.wake(env.getProcedureScheduler()); 148 event = null; 149 } 150}