001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.Optional;
022import org.apache.hadoop.hbase.HConstants;
023import org.apache.hadoop.hbase.ServerName;
024import org.apache.hadoop.hbase.TableName;
025import org.apache.hadoop.hbase.client.RegionInfo;
026import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
028import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
029import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
030import org.apache.hadoop.hbase.procedure2.Procedure;
031import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
032import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
033import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
034import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
035import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
036import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
037import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
038import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
039import org.apache.hadoop.hbase.util.RetryCounter;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseState;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseStateData;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
049
050/**
051 * The base class for the remote procedures used to open/close a region.
052 * <p/>
053 * Notice that here we do not care about the result of the remote call, if the remote call is
054 * finished, either succeeded or not, we will always finish the procedure. The parent procedure
055 * should take care of the result and try to reschedule if the result is not good.
056 */
057@InterfaceAudience.Private
058public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedureEnv>
059  implements TableProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
060
061  private static final Logger LOG = LoggerFactory.getLogger(RegionRemoteProcedureBase.class);
062
063  protected RegionInfo region;
064
065  protected ServerName targetServer;
066
067  private RegionRemoteProcedureBaseState state =
068    RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
069
070  private TransitionCode transitionCode;
071
072  private long seqId;
073
074  private RetryCounter retryCounter;
075
076  protected RegionRemoteProcedureBase() {
077  }
078
079  protected RegionRemoteProcedureBase(TransitRegionStateProcedure parent, RegionInfo region,
080    ServerName targetServer) {
081    this.region = region;
082    this.targetServer = targetServer;
083    parent.attachRemoteProc(this);
084  }
085
086  @Override
087  public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(MasterProcedureEnv env,
088    ServerName remote) {
089    // REPORT_SUCCEED means that this remote open/close request already executed in RegionServer.
090    // So return empty operation and RSProcedureDispatcher no need to send it again.
091    if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) {
092      return Optional.empty();
093    }
094    return Optional.of(newRemoteOperation());
095  }
096
097  protected abstract RemoteProcedureDispatcher.RemoteOperation newRemoteOperation();
098
099  @Override
100  public void remoteOperationCompleted(MasterProcedureEnv env) {
101    // should not be called since we use reportRegionStateTransition to report the result
102    throw new UnsupportedOperationException();
103  }
104
105  @Override
106  public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
107    // should not be called since we use reportRegionStateTransition to report the result
108    throw new UnsupportedOperationException();
109  }
110
111  private RegionStateNode getRegionNode(MasterProcedureEnv env) {
112    return env.getAssignmentManager().getRegionStates().getRegionStateNode(region);
113  }
114
115  @Override
116  public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) {
117    RegionStateNode regionNode = getRegionNode(env);
118    regionNode.lock();
119    try {
120      if (!env.getMasterServices().getServerManager().isServerOnline(remote)) {
121        // the SCP will interrupt us, give up
122        LOG.debug("{} for region {}, targetServer {} is dead, SCP will interrupt us, give up", this,
123          regionNode, remote);
124        return;
125      }
126      if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
127        // not sure how can this happen but anyway let's add a check here to avoid waking the wrong
128        // procedure...
129        LOG.warn("{} for region {}, targetServer={} has already been woken up, ignore", this,
130          regionNode, remote);
131        return;
132      }
133      LOG.warn("The remote operation {} for region {} to server {} failed", this, regionNode,
134        remote, exception);
135      // It is OK to not persist the state here, as we do not need to change the region state if the
136      // remote call is failed. If the master crashed before we actually execute the procedure and
137      // persist the new state, it is fine to retry on the same target server again.
138      state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH_FAIL;
139      regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
140    } finally {
141      regionNode.unlock();
142    }
143  }
144
145  @Override
146  public TableName getTableName() {
147    return region.getTable();
148  }
149
150  @Override
151  protected boolean waitInitialized(MasterProcedureEnv env) {
152    if (TableName.isMetaTableName(getTableName())) {
153      return false;
154    }
155    // First we need meta to be loaded, and second, if meta is not online then we will likely to
156    // fail when updating meta so we wait until it is assigned.
157    AssignmentManager am = env.getAssignmentManager();
158    return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, region);
159  }
160
161  @Override
162  protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
163    throw new UnsupportedOperationException();
164  }
165
166  @Override
167  protected boolean abort(MasterProcedureEnv env) {
168    return false;
169  }
170
171  // do some checks to see if the report is valid
172  protected abstract void checkTransition(RegionStateNode regionNode, TransitionCode transitionCode,
173    long seqId) throws UnexpectedStateException;
174
175  // change the in memory state of the regionNode, but do not update meta.
176  protected abstract void updateTransitionWithoutPersistingToMeta(MasterProcedureEnv env,
177    RegionStateNode regionNode, TransitionCode transitionCode, long seqId) throws IOException;
178
179  // A bit strange but the procedure store will throw RuntimeException if we can not persist the
180  // state, so upper layer should take care of this...
181  private void persistAndWake(MasterProcedureEnv env, RegionStateNode regionNode) {
182    env.getMasterServices().getMasterProcedureExecutor().getStore().update(this);
183    regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
184  }
185
186  // should be called with RegionStateNode locked, to avoid race with the execute method below
187  void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName,
188    TransitionCode transitionCode, long seqId) throws IOException {
189    if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
190      // should be a retry
191      return;
192    }
193    if (!targetServer.equals(serverName)) {
194      throw new UnexpectedStateException("Received report from " + serverName + ", expected "
195        + targetServer + ", " + regionNode + ", proc=" + this);
196    }
197    checkTransition(regionNode, transitionCode, seqId);
198    // this state means we have received the report from RS, does not mean the result is fine, as we
199    // may received a FAILED_OPEN.
200    this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED;
201    this.transitionCode = transitionCode;
202    this.seqId = seqId;
203    // Persist the transition code and openSeqNum(if provided).
204    // We should not update the hbase:meta directly as this may cause races when master restarts,
205    // as the old active master may incorrectly report back to RS and cause the new master to hang
206    // on a OpenRegionProcedure forever. See HBASE-22060 and HBASE-22074 for more details.
207    boolean succ = false;
208    try {
209      persistAndWake(env, regionNode);
210      succ = true;
211    } finally {
212      if (!succ) {
213        this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
214        this.transitionCode = null;
215        this.seqId = HConstants.NO_SEQNUM;
216      }
217    }
218    try {
219      updateTransitionWithoutPersistingToMeta(env, regionNode, transitionCode, seqId);
220    } catch (IOException e) {
221      throw new AssertionError("should not happen", e);
222    }
223  }
224
225  void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName) {
226    if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH) {
227      // should be a retry
228      return;
229    }
230    RegionRemoteProcedureBaseState oldState = state;
231    // it is possible that the state is in REGION_REMOTE_PROCEDURE_SERVER_CRASH, think of this
232    // sequence
233    // 1. region is open on the target server and the above reportTransition call is succeeded
234    // 2. before we are woken up and update the meta, the target server crashes, and then we arrive
235    // here
236    this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH;
237    boolean succ = false;
238    try {
239      persistAndWake(env, regionNode);
240      succ = true;
241    } finally {
242      if (!succ) {
243        this.state = oldState;
244      }
245    }
246  }
247
248  protected abstract void restoreSucceedState(AssignmentManager am, RegionStateNode regionNode,
249    long seqId) throws IOException;
250
251  void stateLoaded(AssignmentManager am, RegionStateNode regionNode) {
252    if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) {
253      try {
254        restoreSucceedState(am, regionNode, seqId);
255      } catch (IOException e) {
256        // should not happen as we are just restoring the state
257        throw new AssertionError(e);
258      }
259    }
260  }
261
262  private TransitRegionStateProcedure getParent(MasterProcedureEnv env) {
263    return (TransitRegionStateProcedure) env.getMasterServices().getMasterProcedureExecutor()
264      .getProcedure(getParentProcId());
265  }
266
267  private void unattach(MasterProcedureEnv env) {
268    getParent(env).unattachRemoteProc(this);
269  }
270
271  @Override
272  protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
273    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
274    RegionStateNode regionNode = getRegionNode(env);
275    regionNode.lock();
276    try {
277      switch (state) {
278        case REGION_REMOTE_PROCEDURE_DISPATCH: {
279          // The code which wakes us up also needs to lock the RSN so here we do not need to
280          // synchronize
281          // on the event.
282          ProcedureEvent<?> event = regionNode.getProcedureEvent();
283          try {
284            env.getRemoteDispatcher().addOperationToNode(targetServer, this);
285          } catch (FailedRemoteDispatchException e) {
286            LOG.warn("Can not add remote operation {} for region {} to server {}, this usually "
287              + "because the server is alread dead, give up and mark the procedure as complete, "
288              + "the parent procedure will take care of this.", this, region, targetServer, e);
289            unattach(env);
290            return null;
291          }
292          event.suspend();
293          event.suspendIfNotReady(this);
294          throw new ProcedureSuspendedException();
295        }
296        case REGION_REMOTE_PROCEDURE_REPORT_SUCCEED:
297          env.getAssignmentManager().persistToMeta(regionNode);
298          unattach(env);
299          return null;
300        case REGION_REMOTE_PROCEDURE_DISPATCH_FAIL:
301          // the remote call is failed so we do not need to change the region state, just return.
302          unattach(env);
303          return null;
304        case REGION_REMOTE_PROCEDURE_SERVER_CRASH:
305          env.getAssignmentManager().regionClosedAbnormally(regionNode);
306          unattach(env);
307          return null;
308        default:
309          throw new IllegalStateException("Unknown state: " + state);
310      }
311    } catch (IOException e) {
312      if (retryCounter == null) {
313        retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
314      }
315      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
316      LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e);
317      setTimeout(Math.toIntExact(backoff));
318      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
319      skipPersistence();
320      throw new ProcedureSuspendedException();
321    } finally {
322      regionNode.unlock();
323    }
324  }
325
326  @Override
327  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
328    setState(ProcedureProtos.ProcedureState.RUNNABLE);
329    env.getProcedureScheduler().addFront(this);
330    return false; // 'false' means that this procedure handled the timeout
331  }
332
333  @Override
334  public boolean storeInDispatchedQueue() {
335    return false;
336  }
337
338  @Override
339  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
340    RegionRemoteProcedureBaseStateData.Builder builder =
341      RegionRemoteProcedureBaseStateData.newBuilder().setRegion(ProtobufUtil.toRegionInfo(region))
342        .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state);
343    if (transitionCode != null) {
344      builder.setTransitionCode(transitionCode);
345      builder.setSeqId(seqId);
346    }
347    serializer.serialize(builder.build());
348  }
349
350  @Override
351  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
352    RegionRemoteProcedureBaseStateData data =
353      serializer.deserialize(RegionRemoteProcedureBaseStateData.class);
354    region = ProtobufUtil.toRegionInfo(data.getRegion());
355    targetServer = ProtobufUtil.toServerName(data.getTargetServer());
356    // 'state' may not be present if we are reading an 'old' form of this pb Message.
357    if (data.hasState()) {
358      state = data.getState();
359    }
360    if (data.hasTransitionCode()) {
361      transitionCode = data.getTransitionCode();
362      seqId = data.getSeqId();
363    }
364  }
365
366  @Override
367  protected void afterReplay(MasterProcedureEnv env) {
368    getParent(env).attachRemoteProc(this);
369  }
370
371  @Override
372  public String getProcName() {
373    return getClass().getSimpleName() + " " + region.getEncodedName();
374  }
375
376  @Override
377  protected void toStringClassDetails(StringBuilder builder) {
378    builder.append(getProcName());
379    if (targetServer != null) {
380      builder.append(", server=");
381      builder.append(this.targetServer);
382    }
383    if (this.retryCounter != null) {
384      builder.append(", retry=");
385      builder.append(this.retryCounter);
386    }
387  }
388}