001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import org.apache.hadoop.hbase.TableName;
023import org.apache.hadoop.hbase.TableNotFoundException;
024import org.apache.hadoop.hbase.client.TableDescriptor;
025import org.apache.hadoop.hbase.client.TableState;
026import org.apache.hadoop.hbase.master.TableStateManager;
027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
028import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
029import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
030import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
031import org.apache.hadoop.hbase.replication.ReplicationException;
032import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
038
039/**
040 * The base class for all replication peer related procedure except sync replication state
041 * transition.
042 */
043@InterfaceAudience.Private
044public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModificationState> {
045
046  private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
047
048  protected ModifyPeerProcedure() {
049  }
050
051  protected ModifyPeerProcedure(String peerId) {
052    super(peerId);
053  }
054
055  /**
056   * Called before we start the actual processing. The implementation should call the pre CP hook,
057   * and also the pre-check for the peer modification.
058   * <p>
059   * If an IOException is thrown then we will give up and mark the procedure as failed directly. If
060   * all checks passes then the procedure can not be rolled back any more.
061   */
062  protected abstract void prePeerModification(MasterProcedureEnv env)
063    throws IOException, ReplicationException, InterruptedException;
064
065  protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
066
067  /**
068   * Called before we finish the procedure. The implementation can do some logging work, and also
069   * call the coprocessor hook if any.
070   * <p>
071   * Notice that, since we have already done the actual work, throwing {@code IOException} here will
072   * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If
073   * {@code ReplicationException} is thrown we will retry since this usually means we fails to
074   * update the peer storage.
075   */
076  protected abstract void postPeerModification(MasterProcedureEnv env)
077    throws IOException, ReplicationException;
078
079  protected void releaseLatch(MasterProcedureEnv env) {
080    ProcedurePrepareLatch.releaseLatch(latch, this);
081  }
082
083  /**
084   * Implementation class can override this method. By default we will jump to
085   * POST_PEER_MODIFICATION and finish the procedure.
086   */
087  protected PeerModificationState nextStateAfterRefresh() {
088    return PeerModificationState.POST_PEER_MODIFICATION;
089  }
090
091  /**
092   * The implementation class should override this method if the procedure may enter the serial
093   * related states.
094   */
095  protected boolean enablePeerBeforeFinish() {
096    throw new UnsupportedOperationException();
097  }
098
099  protected ReplicationPeerConfig getOldPeerConfig() {
100    return null;
101  }
102
103  protected ReplicationPeerConfig getNewPeerConfig() {
104    throw new UnsupportedOperationException();
105  }
106
107  protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
108    throws IOException, ReplicationException {
109    throw new UnsupportedOperationException();
110  }
111
112  // If the table is in enabling state, we need to wait until it is enabled and then reopen all its
113  // regions.
114  private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException {
115    for (;;) {
116      try {
117        TableState state = tsm.getTableState(tn);
118        if (state.isEnabled()) {
119          return true;
120        }
121        if (!state.isEnabling()) {
122          return false;
123        }
124        Thread.sleep(SLEEP_INTERVAL_MS);
125      } catch (TableNotFoundException e) {
126        return false;
127      } catch (InterruptedException e) {
128        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
129      }
130    }
131  }
132
133  // will be override in test to simulate error
134  protected void reopenRegions(MasterProcedureEnv env) throws IOException {
135    ReplicationPeerConfig peerConfig = getNewPeerConfig();
136    ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
137    TableStateManager tsm = env.getMasterServices().getTableStateManager();
138    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
139      if (!td.hasGlobalReplicationScope()) {
140        continue;
141      }
142      TableName tn = td.getTableName();
143      if (!peerConfig.needToReplicate(tn)) {
144        continue;
145      }
146      if (oldPeerConfig != null && oldPeerConfig.isSerial() && oldPeerConfig.needToReplicate(tn)) {
147        continue;
148      }
149      if (needReopen(tsm, tn)) {
150        addChildProcedure(new ReopenTableRegionsProcedure(tn));
151      }
152    }
153  }
154
155  @Override
156  protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
157    throws ProcedureSuspendedException, InterruptedException {
158    switch (state) {
159      case PRE_PEER_MODIFICATION:
160        try {
161          prePeerModification(env);
162        } catch (IOException e) {
163          LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, "
164            + "mark the procedure as failure and give up", getClass().getName(), peerId, e);
165          setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
166          releaseLatch(env);
167          return Flow.NO_MORE_STATE;
168        } catch (ReplicationException e) {
169          throw suspend(env.getMasterConfiguration(),
170            backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
171              getClass().getName(), peerId, backoff / 1000, e));
172        }
173        resetRetry();
174        setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
175        return Flow.HAS_MORE_STATE;
176      case UPDATE_PEER_STORAGE:
177        try {
178          updatePeerStorage(env);
179        } catch (ReplicationException e) {
180          throw suspend(env.getMasterConfiguration(),
181            backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs",
182              getClass().getName(), peerId, backoff / 1000, e));
183        }
184        resetRetry();
185        setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
186        return Flow.HAS_MORE_STATE;
187      case REFRESH_PEER_ON_RS:
188        refreshPeer(env, getPeerOperationType());
189        setNextState(nextStateAfterRefresh());
190        return Flow.HAS_MORE_STATE;
191      case SERIAL_PEER_REOPEN_REGIONS:
192        try {
193          reopenRegions(env);
194        } catch (Exception e) {
195          throw suspend(env.getMasterConfiguration(),
196            backoff -> LOG.warn("{} reopen regions for peer {} failed,  sleep {} secs",
197              getClass().getName(), peerId, backoff / 1000, e));
198        }
199        resetRetry();
200        setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
201        return Flow.HAS_MORE_STATE;
202      case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
203        try {
204          updateLastPushedSequenceIdForSerialPeer(env);
205        } catch (Exception e) {
206          throw suspend(env.getMasterConfiguration(),
207            backoff -> LOG.warn("{} set last sequence id for peer {} failed,  sleep {} secs",
208              getClass().getName(), peerId, backoff / 1000, e));
209        }
210        resetRetry();
211        setNextState(enablePeerBeforeFinish()
212          ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
213          : PeerModificationState.POST_PEER_MODIFICATION);
214        return Flow.HAS_MORE_STATE;
215      case SERIAL_PEER_SET_PEER_ENABLED:
216        try {
217          enablePeer(env);
218        } catch (ReplicationException e) {
219          throw suspend(env.getMasterConfiguration(),
220            backoff -> LOG.warn("{} enable peer before finish for peer {} failed,  sleep {} secs",
221              getClass().getName(), peerId, backoff / 1000, e));
222        }
223        resetRetry();
224        setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
225        return Flow.HAS_MORE_STATE;
226      case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
227        refreshPeer(env, PeerOperationType.ENABLE);
228        setNextState(PeerModificationState.POST_PEER_MODIFICATION);
229        return Flow.HAS_MORE_STATE;
230      case POST_PEER_MODIFICATION:
231        try {
232          postPeerModification(env);
233        } catch (ReplicationException e) {
234          throw suspend(env.getMasterConfiguration(),
235            backoff -> LOG.warn(
236              "{} failed to call postPeerModification for peer {},  sleep {} secs",
237              getClass().getName(), peerId, backoff / 1000, e));
238        } catch (IOException e) {
239          LOG.warn("{} failed to call post CP hook for peer {}, "
240            + "ignore since the procedure has already done", getClass().getName(), peerId, e);
241        }
242        releaseLatch(env);
243        return Flow.NO_MORE_STATE;
244      default:
245        throw new UnsupportedOperationException("unhandled state=" + state);
246    }
247  }
248
249  @Override
250  protected PeerModificationState getState(int stateId) {
251    return PeerModificationState.forNumber(stateId);
252  }
253
254  @Override
255  protected int getStateId(PeerModificationState state) {
256    return state.getNumber();
257  }
258
259  @Override
260  protected PeerModificationState getInitialState() {
261    return PeerModificationState.PRE_PEER_MODIFICATION;
262  }
263}