001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.Optional;
022import java.util.concurrent.CompletableFuture;
023import org.apache.hadoop.hbase.HConstants;
024import org.apache.hadoop.hbase.ServerName;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.RegionInfo;
027import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
028import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
029import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
030import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
031import org.apache.hadoop.hbase.procedure2.Procedure;
032import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
033import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil;
034import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
035import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
036import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
037import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
038import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
039import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
040import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
041import org.apache.hadoop.hbase.util.RetryCounter;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseState;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseStateData;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
052
053/**
054 * The base class for the remote procedures used to open/close a region.
055 * <p/>
056 * Notice that here we do not care about the result of the remote call, if the remote call is
057 * finished, either succeeded or not, we will always finish the procedure. The parent procedure
058 * should take care of the result and try to reschedule if the result is not good.
059 */
060@InterfaceAudience.Private
061public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedureEnv>
062  implements TableProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
063
064  private static final Logger LOG = LoggerFactory.getLogger(RegionRemoteProcedureBase.class);
065
066  protected RegionInfo region;
067
068  protected ServerName targetServer;
069
070  private RegionRemoteProcedureBaseState state =
071    RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
072
073  private TransitionCode transitionCode;
074
075  private long seqId;
076
077  private RetryCounter retryCounter;
078
079  private CompletableFuture<Void> future;
080
081  protected RegionRemoteProcedureBase() {
082  }
083
084  protected RegionRemoteProcedureBase(TransitRegionStateProcedure parent, RegionInfo region,
085    ServerName targetServer) {
086    this.region = region;
087    this.targetServer = targetServer;
088    parent.attachRemoteProc(this);
089  }
090
091  @Override
092  public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(MasterProcedureEnv env,
093    ServerName remote) {
094    // REPORT_SUCCEED means that this remote open/close request already executed in RegionServer.
095    // So return empty operation and RSProcedureDispatcher no need to send it again.
096    if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) {
097      return Optional.empty();
098    }
099    return Optional.of(newRemoteOperation());
100  }
101
102  protected abstract RemoteProcedureDispatcher.RemoteOperation newRemoteOperation();
103
104  @Override
105  public void remoteOperationCompleted(MasterProcedureEnv env) {
106    // should not be called since we use reportRegionStateTransition to report the result
107    throw new UnsupportedOperationException();
108  }
109
110  @Override
111  public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
112    // should not be called since we use reportRegionStateTransition to report the result
113    throw new UnsupportedOperationException();
114  }
115
116  private RegionStateNode getRegionNode(MasterProcedureEnv env) {
117    return env.getAssignmentManager().getRegionStates().getRegionStateNode(region);
118  }
119
120  @Override
121  public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) {
122    RegionStateNode regionNode = getRegionNode(env);
123    regionNode.lock();
124    try {
125      if (!env.getMasterServices().getServerManager().isServerOnline(remote)) {
126        // the SCP will interrupt us, give up
127        LOG.debug("{} for region {}, targetServer {} is dead, SCP will interrupt us, give up", this,
128          regionNode, remote);
129        return;
130      }
131      if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
132        // not sure how can this happen but anyway let's add a check here to avoid waking the wrong
133        // procedure...
134        LOG.warn("{} for region {}, targetServer={} has already been woken up, ignore", this,
135          regionNode, remote);
136        return;
137      }
138      LOG.warn("The remote operation {} for region {} to server {} failed", this, regionNode,
139        remote, exception);
140      // It is OK to not persist the state here, as we do not need to change the region state if the
141      // remote call is failed. If the master crashed before we actually execute the procedure and
142      // persist the new state, it is fine to retry on the same target server again.
143      state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH_FAIL;
144      regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
145    } finally {
146      regionNode.unlock();
147    }
148  }
149
150  @Override
151  public TableName getTableName() {
152    return region.getTable();
153  }
154
155  @Override
156  protected boolean waitInitialized(MasterProcedureEnv env) {
157    if (TableName.isMetaTableName(getTableName())) {
158      return false;
159    }
160    // First we need meta to be loaded, and second, if meta is not online then we will likely to
161    // fail when updating meta so we wait until it is assigned.
162    AssignmentManager am = env.getAssignmentManager();
163    return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, region);
164  }
165
166  @Override
167  protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
168    throw new UnsupportedOperationException();
169  }
170
171  @Override
172  protected boolean abort(MasterProcedureEnv env) {
173    return false;
174  }
175
176  // do some checks to see if the report is valid
177  protected abstract void checkTransition(RegionStateNode regionNode, TransitionCode transitionCode,
178    long seqId) throws UnexpectedStateException;
179
180  // change the in memory state of the regionNode, but do not update meta.
181  protected abstract void updateTransitionWithoutPersistingToMeta(MasterProcedureEnv env,
182    RegionStateNode regionNode, TransitionCode transitionCode, long seqId) throws IOException;
183
184  // A bit strange but the procedure store will throw RuntimeException if we can not persist the
185  // state, so upper layer should take care of this...
186  private void persistAndWake(MasterProcedureEnv env, RegionStateNode regionNode) {
187    // The synchronization here is to guard with ProcedureExecutor.executeRollback, as here we will
188    // not hold the procedure execution lock, but we should not persist a procedure in ROLLEDBACK
189    // state to the procedure store.
190    // The ProcedureStore.update must be inside the lock, so here the check for procedure state and
191    // update could be atomic. In ProcedureExecutor.cleanupAfterRollbackOneStep, we will set the
192    // state to ROLLEDBACK, which will hold the same lock too as the Procedure.setState method is
193    // synchronized. This is the key to keep us safe.
194    synchronized (this) {
195      if (getState() == ProcedureState.ROLLEDBACK) {
196        LOG.warn("Procedure {} has already been rolled back, skip persistent", this);
197        return;
198      }
199      env.getMasterServices().getMasterProcedureExecutor().getStore().update(this);
200    }
201    regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
202  }
203
204  // should be called with RegionStateNode locked, to avoid race with the execute method below
205  void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName,
206    TransitionCode transitionCode, long seqId) throws IOException {
207    if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
208      // should be a retry
209      return;
210    }
211    if (!targetServer.equals(serverName)) {
212      throw new UnexpectedStateException("Received report from " + serverName + ", expected "
213        + targetServer + ", " + regionNode + ", proc=" + this);
214    }
215    checkTransition(regionNode, transitionCode, seqId);
216    // this state means we have received the report from RS, does not mean the result is fine, as we
217    // may received a FAILED_OPEN.
218    this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED;
219    this.transitionCode = transitionCode;
220    this.seqId = seqId;
221    // Persist the transition code and openSeqNum(if provided).
222    // We should not update the hbase:meta directly as this may cause races when master restarts,
223    // as the old active master may incorrectly report back to RS and cause the new master to hang
224    // on a OpenRegionProcedure forever. See HBASE-22060 and HBASE-22074 for more details.
225    boolean succ = false;
226    try {
227      persistAndWake(env, regionNode);
228      succ = true;
229    } finally {
230      if (!succ) {
231        this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
232        this.transitionCode = null;
233        this.seqId = HConstants.NO_SEQNUM;
234      }
235    }
236    try {
237      updateTransitionWithoutPersistingToMeta(env, regionNode, transitionCode, seqId);
238    } catch (IOException e) {
239      throw new AssertionError("should not happen", e);
240    }
241  }
242
243  void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName) {
244    if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH) {
245      // should be a retry
246      return;
247    }
248    RegionRemoteProcedureBaseState oldState = state;
249    // it is possible that the state is in REGION_REMOTE_PROCEDURE_SERVER_CRASH, think of this
250    // sequence
251    // 1. region is open on the target server and the above reportTransition call is succeeded
252    // 2. before we are woken up and update the meta, the target server crashes, and then we arrive
253    // here
254    this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH;
255    boolean succ = false;
256    try {
257      persistAndWake(env, regionNode);
258      succ = true;
259    } finally {
260      if (!succ) {
261        this.state = oldState;
262      }
263    }
264  }
265
266  protected abstract void restoreSucceedState(AssignmentManager am, RegionStateNode regionNode,
267    long seqId) throws IOException;
268
269  void stateLoaded(AssignmentManager am, RegionStateNode regionNode) {
270    if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) {
271      try {
272        restoreSucceedState(am, regionNode, seqId);
273      } catch (IOException e) {
274        // should not happen as we are just restoring the state
275        throw new AssertionError(e);
276      }
277    }
278  }
279
280  private TransitRegionStateProcedure getParent(MasterProcedureEnv env) {
281    return (TransitRegionStateProcedure) env.getMasterServices().getMasterProcedureExecutor()
282      .getProcedure(getParentProcId());
283  }
284
285  private void unattach(MasterProcedureEnv env) {
286    getParent(env).unattachRemoteProc(this);
287  }
288
289  private CompletableFuture<Void> getFuture() {
290    return future;
291  }
292
293  private void setFuture(CompletableFuture<Void> f) {
294    future = f;
295  }
296
297  @Override
298  protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
299    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
300    RegionStateNode regionNode = getRegionNode(env);
301    if (!regionNode.isLockedBy(this)) {
302      regionNode.lock(this, () -> ProcedureFutureUtil.wakeUp(this, env));
303    }
304    try {
305      switch (state) {
306        case REGION_REMOTE_PROCEDURE_DISPATCH: {
307          // The code which wakes us up also needs to lock the RSN so here we do not need to
308          // synchronize on the event.
309          ProcedureEvent<?> event = regionNode.getProcedureEvent();
310          try {
311            env.getRemoteDispatcher().addOperationToNode(targetServer, this);
312          } catch (FailedRemoteDispatchException e) {
313            LOG.warn("Can not add remote operation {} for region {} to server {}, this usually "
314              + "because the server is alread dead, give up and mark the procedure as complete, "
315              + "the parent procedure will take care of this.", this, region, targetServer, e);
316            unattach(env);
317            return null;
318          }
319          event.suspend();
320          event.suspendIfNotReady(this);
321          throw new ProcedureSuspendedException();
322        }
323        case REGION_REMOTE_PROCEDURE_REPORT_SUCCEED:
324          if (
325            ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
326              () -> unattach(env))
327          ) {
328            return null;
329          }
330          ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
331            env.getAssignmentManager().persistToMeta(regionNode), env, () -> unattach(env));
332          return null;
333        case REGION_REMOTE_PROCEDURE_DISPATCH_FAIL:
334          // the remote call is failed so we do not need to change the region state, just return.
335          unattach(env);
336          return null;
337        case REGION_REMOTE_PROCEDURE_SERVER_CRASH:
338          if (
339            ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
340              () -> unattach(env))
341          ) {
342            return null;
343          }
344          ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
345            env.getAssignmentManager().regionClosedAbnormally(regionNode), env,
346            () -> unattach(env));
347          return null;
348        default:
349          throw new IllegalStateException("Unknown state: " + state);
350      }
351    } catch (IOException e) {
352      if (retryCounter == null) {
353        retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
354      }
355      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
356      LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e);
357      throw suspend(Math.toIntExact(backoff), true);
358    } finally {
359      if (future == null) {
360        regionNode.unlock(this);
361      }
362    }
363  }
364
365  @Override
366  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
367    setState(ProcedureProtos.ProcedureState.RUNNABLE);
368    env.getProcedureScheduler().addFront(this);
369    return false; // 'false' means that this procedure handled the timeout
370  }
371
372  @Override
373  public boolean storeInDispatchedQueue() {
374    return false;
375  }
376
377  @Override
378  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
379    RegionRemoteProcedureBaseStateData.Builder builder =
380      RegionRemoteProcedureBaseStateData.newBuilder().setRegion(ProtobufUtil.toRegionInfo(region))
381        .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state);
382    if (transitionCode != null) {
383      builder.setTransitionCode(transitionCode);
384      builder.setSeqId(seqId);
385    }
386    serializer.serialize(builder.build());
387  }
388
389  @Override
390  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
391    RegionRemoteProcedureBaseStateData data =
392      serializer.deserialize(RegionRemoteProcedureBaseStateData.class);
393    region = ProtobufUtil.toRegionInfo(data.getRegion());
394    targetServer = ProtobufUtil.toServerName(data.getTargetServer());
395    // 'state' may not be present if we are reading an 'old' form of this pb Message.
396    if (data.hasState()) {
397      state = data.getState();
398    }
399    if (data.hasTransitionCode()) {
400      transitionCode = data.getTransitionCode();
401      seqId = data.getSeqId();
402    }
403  }
404
405  @Override
406  protected void afterReplay(MasterProcedureEnv env) {
407    getParent(env).attachRemoteProc(this);
408  }
409
410  @Override
411  public String getProcName() {
412    return getClass().getSimpleName() + " " + region.getEncodedName();
413  }
414
415  @Override
416  protected void toStringClassDetails(StringBuilder builder) {
417    builder.append(getProcName());
418    if (targetServer != null) {
419      builder.append(", server=");
420      builder.append(this.targetServer);
421    }
422    if (this.retryCounter != null) {
423      builder.append(", retry=");
424      builder.append(this.retryCounter);
425    }
426  }
427}