001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.function.LongConsumer;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.MetaTableAccessor;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.TableNotFoundException;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.client.TableDescriptor;
031import org.apache.hadoop.hbase.client.TableState;
032import org.apache.hadoop.hbase.master.TableStateManager;
033import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
034import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
035import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
036import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
037import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
038import org.apache.hadoop.hbase.replication.ReplicationException;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
041import org.apache.hadoop.hbase.util.Pair;
042import org.apache.hadoop.hbase.util.RetryCounter;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
049
050/**
051 * The base class for all replication peer related procedure except sync replication state
052 * transition.
053 */
054@InterfaceAudience.Private
055public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModificationState> {
056
057  private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
058
059  protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
060
061  // The sleep interval when waiting table to be enabled or disabled.
062  protected static final int SLEEP_INTERVAL_MS = 1000;
063
064  private RetryCounter retryCounter;
065
066  protected ModifyPeerProcedure() {
067  }
068
069  protected ModifyPeerProcedure(String peerId) {
070    super(peerId);
071  }
072
073  /**
074   * Called before we start the actual processing. The implementation should call the pre CP hook,
075   * and also the pre-check for the peer modification.
076   * <p>
077   * If an IOException is thrown then we will give up and mark the procedure as failed directly. If
078   * all checks passes then the procedure can not be rolled back any more.
079   */
080  protected abstract void prePeerModification(MasterProcedureEnv env)
081      throws IOException, ReplicationException;
082
083  protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
084
085  /**
086   * Called before we finish the procedure. The implementation can do some logging work, and also
087   * call the coprocessor hook if any.
088   * <p>
089   * Notice that, since we have already done the actual work, throwing {@code IOException} here will
090   * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If
091   * {@code ReplicationException} is thrown we will retry since this usually means we fails to
092   * update the peer storage.
093   */
094  protected abstract void postPeerModification(MasterProcedureEnv env)
095      throws IOException, ReplicationException;
096
097  private void releaseLatch() {
098    ProcedurePrepareLatch.releaseLatch(latch, this);
099  }
100
101  /**
102   * Implementation class can override this method. By default we will jump to
103   * POST_PEER_MODIFICATION and finish the procedure.
104   */
105  protected PeerModificationState nextStateAfterRefresh() {
106    return PeerModificationState.POST_PEER_MODIFICATION;
107  }
108
109  /**
110   * The implementation class should override this method if the procedure may enter the serial
111   * related states.
112   */
113  protected boolean enablePeerBeforeFinish() {
114    throw new UnsupportedOperationException();
115  }
116
117  private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
118    addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
119      .map(sn -> new RefreshPeerProcedure(peerId, type, sn))
120      .toArray(RefreshPeerProcedure[]::new));
121  }
122
123  protected ReplicationPeerConfig getOldPeerConfig() {
124    return null;
125  }
126
127  protected ReplicationPeerConfig getNewPeerConfig() {
128    throw new UnsupportedOperationException();
129  }
130
131  protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
132      throws IOException, ReplicationException {
133    throw new UnsupportedOperationException();
134  }
135
136  // If the table is in enabling state, we need to wait until it is enabled and then reopen all its
137  // regions.
138  private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException {
139    for (;;) {
140      try {
141        TableState state = tsm.getTableState(tn);
142        if (state.isEnabled()) {
143          return true;
144        }
145        if (!state.isEnabling()) {
146          return false;
147        }
148        Thread.sleep(SLEEP_INTERVAL_MS);
149      } catch (TableNotFoundException e) {
150        return false;
151      } catch (InterruptedException e) {
152        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
153      }
154    }
155  }
156
157  // will be override in test to simulate error
158  protected void reopenRegions(MasterProcedureEnv env) throws IOException {
159    ReplicationPeerConfig peerConfig = getNewPeerConfig();
160    ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
161    TableStateManager tsm = env.getMasterServices().getTableStateManager();
162    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
163      if (!td.hasGlobalReplicationScope()) {
164        continue;
165      }
166      TableName tn = td.getTableName();
167      if (!peerConfig.needToReplicate(tn)) {
168        continue;
169      }
170      if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
171        oldPeerConfig.needToReplicate(tn)) {
172        continue;
173      }
174      if (needReopen(tsm, tn)) {
175        addChildProcedure(new ReopenTableRegionsProcedure(tn));
176      }
177    }
178  }
179
180  // will be override in test to simulate error
181  protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
182    env.getReplicationPeerManager().enablePeer(peerId);
183  }
184
185  private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
186      ReplicationQueueStorage queueStorage) throws ReplicationException {
187    if (barrier >= 0) {
188      lastSeqIds.put(encodedRegionName, barrier);
189      if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
190        queueStorage.setLastSequenceIds(peerId, lastSeqIds);
191        lastSeqIds.clear();
192      }
193    }
194  }
195
196  protected final void setLastPushedSequenceId(MasterProcedureEnv env,
197      ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
198    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
199    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
200      if (!td.hasGlobalReplicationScope()) {
201        continue;
202      }
203      TableName tn = td.getTableName();
204      if (!peerConfig.needToReplicate(tn)) {
205        continue;
206      }
207      setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
208    }
209    if (!lastSeqIds.isEmpty()) {
210      env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
211    }
212  }
213
214  // If the table is currently disabling, then we need to wait until it is disabled.We will write
215  // replication barrier for a disabled table. And return whether we need to update the last pushed
216  // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
217  // then we do not need to update last pushed sequence id for this table.
218  private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
219      throws IOException {
220    for (;;) {
221      try {
222        if (!tsm.getTableState(tn).isDisabling()) {
223          return true;
224        }
225        Thread.sleep(SLEEP_INTERVAL_MS);
226      } catch (TableNotFoundException e) {
227        return false;
228      } catch (InterruptedException e) {
229        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
230      }
231    }
232  }
233
234  // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
235  // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
236  // should not forget to check whether the map is empty at last, if not you should call
237  // queueStorage.setLastSequenceIds to write out the remaining entries in the map.
238  protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
239      Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
240    TableStateManager tsm = env.getMasterServices().getTableStateManager();
241    ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
242    Connection conn = env.getMasterServices().getConnection();
243    if (!needSetLastPushedSequenceId(tsm, tableName)) {
244      LOG.debug("Skip settting last pushed sequence id for {}", tableName);
245      return;
246    }
247    for (Pair<String, Long> name2Barrier : MetaTableAccessor
248      .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
249      LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
250      addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
251        queueStorage);
252    }
253  }
254
255  @Override
256  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
257    setState(ProcedureProtos.ProcedureState.RUNNABLE);
258    env.getProcedureScheduler().addFront(this);
259    return false;
260  }
261
262  private ProcedureSuspendedException suspend(Configuration conf,
263      LongConsumer backoffConsumer) throws ProcedureSuspendedException {
264    if (retryCounter == null) {
265      retryCounter = ProcedureUtil.createRetryCounter(conf);
266    }
267    long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
268    backoffConsumer.accept(backoff);
269    setTimeout(Math.toIntExact(backoff));
270    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
271    skipPersistence();
272    throw new ProcedureSuspendedException();
273  }
274
275  @Override
276  protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
277      throws ProcedureSuspendedException {
278    switch (state) {
279      case PRE_PEER_MODIFICATION:
280        try {
281          prePeerModification(env);
282        } catch (IOException e) {
283          LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
284            "mark the procedure as failure and give up", getClass().getName(), peerId, e);
285          setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
286          releaseLatch();
287          return Flow.NO_MORE_STATE;
288        } catch (ReplicationException e) {
289          throw suspend(env.getMasterConfiguration(),
290            backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
291              getClass().getName(), peerId, backoff / 1000, e));
292        }
293        retryCounter = null;
294        setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
295        return Flow.HAS_MORE_STATE;
296      case UPDATE_PEER_STORAGE:
297        try {
298          updatePeerStorage(env);
299        } catch (ReplicationException e) {
300          throw suspend(env.getMasterConfiguration(),
301            backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs",
302              getClass().getName(), peerId, backoff / 1000, e));
303        }
304        retryCounter = null;
305        setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
306        return Flow.HAS_MORE_STATE;
307      case REFRESH_PEER_ON_RS:
308        refreshPeer(env, getPeerOperationType());
309        setNextState(nextStateAfterRefresh());
310        return Flow.HAS_MORE_STATE;
311      case SERIAL_PEER_REOPEN_REGIONS:
312        try {
313          reopenRegions(env);
314        } catch (Exception e) {
315          throw suspend(env.getMasterConfiguration(),
316            backoff -> LOG.warn("{} reopen regions for peer {} failed,  sleep {} secs",
317              getClass().getName(), peerId, backoff / 1000, e));
318        }
319        retryCounter = null;
320        setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
321        return Flow.HAS_MORE_STATE;
322      case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
323        try {
324          updateLastPushedSequenceIdForSerialPeer(env);
325        } catch (Exception e) {
326          throw suspend(env.getMasterConfiguration(),
327            backoff -> LOG.warn("{} set last sequence id for peer {} failed,  sleep {} secs",
328              getClass().getName(), peerId, backoff / 1000, e));
329        }
330        retryCounter = null;
331        setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
332          : PeerModificationState.POST_PEER_MODIFICATION);
333        return Flow.HAS_MORE_STATE;
334      case SERIAL_PEER_SET_PEER_ENABLED:
335        try {
336          enablePeer(env);
337        } catch (ReplicationException e) {
338          throw suspend(env.getMasterConfiguration(),
339            backoff -> LOG.warn("{} enable peer before finish for peer {} failed,  sleep {} secs",
340              getClass().getName(), peerId, backoff / 1000, e));
341        }
342        retryCounter = null;
343        setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
344        return Flow.HAS_MORE_STATE;
345      case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
346        refreshPeer(env, PeerOperationType.ENABLE);
347        setNextState(PeerModificationState.POST_PEER_MODIFICATION);
348        return Flow.HAS_MORE_STATE;
349      case POST_PEER_MODIFICATION:
350        try {
351          postPeerModification(env);
352        } catch (ReplicationException e) {
353          throw suspend(env.getMasterConfiguration(),
354            backoff -> LOG.warn(
355              "{} failed to call postPeerModification for peer {},  sleep {} secs",
356              getClass().getName(), peerId, backoff / 1000, e));
357        } catch (IOException e) {
358          LOG.warn("{} failed to call post CP hook for peer {}, " +
359            "ignore since the procedure has already done", getClass().getName(), peerId, e);
360        }
361        releaseLatch();
362        return Flow.NO_MORE_STATE;
363      default:
364        throw new UnsupportedOperationException("unhandled state=" + state);
365    }
366  }
367
368  @Override
369  protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
370      throws IOException, InterruptedException {
371    if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
372      // actually the peer related operations has no rollback, but if we haven't done any
373      // modifications on the peer storage yet, we can just return.
374      return;
375    }
376    throw new UnsupportedOperationException();
377  }
378
379  @Override
380  protected PeerModificationState getState(int stateId) {
381    return PeerModificationState.forNumber(stateId);
382  }
383
384  @Override
385  protected int getStateId(PeerModificationState state) {
386    return state.getNumber();
387  }
388
389  @Override
390  protected PeerModificationState getInitialState() {
391    return PeerModificationState.PRE_PEER_MODIFICATION;
392  }
393}