001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.HashMap;
023import java.util.Map;
024import org.apache.hadoop.hbase.MetaTableAccessor;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Connection;
027import org.apache.hadoop.hbase.client.TableDescriptor;
028import org.apache.hadoop.hbase.client.TableState;
029import org.apache.hadoop.hbase.master.TableStateManager;
030import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException;
031import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
032import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
033import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
034import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
035import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
036import org.apache.hadoop.hbase.replication.ReplicationException;
037import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
038import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
039import org.apache.hadoop.hbase.util.Pair;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
048
049/**
050 * The base class for all replication peer related procedure except sync replication state
051 * transition.
052 */
053@InterfaceAudience.Private
054public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModificationState> {
055
056  private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
057
058  protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
059
060  // The sleep interval when waiting table to be enabled or disabled.
061  protected static final int SLEEP_INTERVAL_MS = 1000;
062
063  private int attemps;
064
065  protected ModifyPeerProcedure() {
066  }
067
068  protected ModifyPeerProcedure(String peerId) {
069    super(peerId);
070  }
071
072  /**
073   * Called before we start the actual processing. The implementation should call the pre CP hook,
074   * and also the pre-check for the peer modification.
075   * <p>
076   * If an IOException is thrown then we will give up and mark the procedure as failed directly. If
077   * all checks passes then the procedure can not be rolled back any more.
078   */
079  protected abstract void prePeerModification(MasterProcedureEnv env)
080      throws IOException, ReplicationException;
081
082  protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
083
084  /**
085   * Called before we finish the procedure. The implementation can do some logging work, and also
086   * call the coprocessor hook if any.
087   * <p>
088   * Notice that, since we have already done the actual work, throwing {@code IOException} here will
089   * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If
090   * {@code ReplicationException} is thrown we will retry since this usually means we fails to
091   * update the peer storage.
092   */
093  protected abstract void postPeerModification(MasterProcedureEnv env)
094      throws IOException, ReplicationException;
095
096  private void releaseLatch() {
097    ProcedurePrepareLatch.releaseLatch(latch, this);
098  }
099
100  /**
101   * Implementation class can override this method. By default we will jump to
102   * POST_PEER_MODIFICATION and finish the procedure.
103   */
104  protected PeerModificationState nextStateAfterRefresh() {
105    return PeerModificationState.POST_PEER_MODIFICATION;
106  }
107
108  /**
109   * The implementation class should override this method if the procedure may enter the serial
110   * related states.
111   */
112  protected boolean enablePeerBeforeFinish() {
113    throw new UnsupportedOperationException();
114  }
115
116  private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
117    addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
118      .map(sn -> new RefreshPeerProcedure(peerId, type, sn))
119      .toArray(RefreshPeerProcedure[]::new));
120  }
121
122  protected ReplicationPeerConfig getOldPeerConfig() {
123    return null;
124  }
125
126  protected ReplicationPeerConfig getNewPeerConfig() {
127    throw new UnsupportedOperationException();
128  }
129
130  protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
131      throws IOException, ReplicationException {
132    throw new UnsupportedOperationException();
133  }
134
135  // If the table is in enabling state, we need to wait until it is enabled and then reopen all its
136  // regions.
137  private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException {
138    for (;;) {
139      try {
140        TableState state = tsm.getTableState(tn);
141        if (state.isEnabled()) {
142          return true;
143        }
144        if (!state.isEnabling()) {
145          return false;
146        }
147        Thread.sleep(SLEEP_INTERVAL_MS);
148      } catch (TableStateNotFoundException e) {
149        return false;
150      } catch (InterruptedException e) {
151        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
152      }
153    }
154  }
155
156  // will be override in test to simulate error
157  @VisibleForTesting
158  protected void reopenRegions(MasterProcedureEnv env) throws IOException {
159    ReplicationPeerConfig peerConfig = getNewPeerConfig();
160    ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
161    TableStateManager tsm = env.getMasterServices().getTableStateManager();
162    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
163      if (!td.hasGlobalReplicationScope()) {
164        continue;
165      }
166      TableName tn = td.getTableName();
167      if (!peerConfig.needToReplicate(tn)) {
168        continue;
169      }
170      if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
171        oldPeerConfig.needToReplicate(tn)) {
172        continue;
173      }
174      if (needReopen(tsm, tn)) {
175        addChildProcedure(new ReopenTableRegionsProcedure(tn));
176      }
177    }
178  }
179
180  // will be override in test to simulate error
181  @VisibleForTesting
182  protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
183    env.getReplicationPeerManager().enablePeer(peerId);
184  }
185
186  private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
187      ReplicationQueueStorage queueStorage) throws ReplicationException {
188    if (barrier >= 0) {
189      lastSeqIds.put(encodedRegionName, barrier);
190      if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
191        queueStorage.setLastSequenceIds(peerId, lastSeqIds);
192        lastSeqIds.clear();
193      }
194    }
195  }
196
197  protected final void setLastPushedSequenceId(MasterProcedureEnv env,
198      ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
199    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
200    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
201      if (!td.hasGlobalReplicationScope()) {
202        continue;
203      }
204      TableName tn = td.getTableName();
205      if (!peerConfig.needToReplicate(tn)) {
206        continue;
207      }
208      setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
209    }
210    if (!lastSeqIds.isEmpty()) {
211      env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
212    }
213  }
214
215  // If the table is currently disabling, then we need to wait until it is disabled.We will write
216  // replication barrier for a disabled table. And return whether we need to update the last pushed
217  // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
218  // then we do not need to update last pushed sequence id for this table.
219  private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
220      throws IOException {
221    for (;;) {
222      try {
223        if (!tsm.getTableState(tn).isDisabling()) {
224          return true;
225        }
226        Thread.sleep(SLEEP_INTERVAL_MS);
227      } catch (TableStateNotFoundException e) {
228        return false;
229      } catch (InterruptedException e) {
230        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
231      }
232    }
233  }
234
235  // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
236  // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
237  // should not forget to check whether the map is empty at last, if not you should call
238  // queueStorage.setLastSequenceIds to write out the remaining entries in the map.
239  protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
240      Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
241    TableStateManager tsm = env.getMasterServices().getTableStateManager();
242    ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
243    Connection conn = env.getMasterServices().getConnection();
244    if (!needSetLastPushedSequenceId(tsm, tableName)) {
245      LOG.debug("Skip settting last pushed sequence id for {}", tableName);
246      return;
247    }
248    for (Pair<String, Long> name2Barrier : MetaTableAccessor
249      .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
250      LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
251      addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
252        queueStorage);
253    }
254  }
255
256  @Override
257  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
258    setState(ProcedureProtos.ProcedureState.RUNNABLE);
259    env.getProcedureScheduler().addFront(this);
260    return false;
261  }
262
263  private ProcedureSuspendedException suspend(long backoff) throws ProcedureSuspendedException {
264    attemps++;
265    setTimeout(Math.toIntExact(backoff));
266    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
267    skipPersistence();
268    throw new ProcedureSuspendedException();
269  }
270
271  @Override
272  protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
273      throws ProcedureSuspendedException {
274    switch (state) {
275      case PRE_PEER_MODIFICATION:
276        try {
277          prePeerModification(env);
278        } catch (IOException e) {
279          LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
280            "mark the procedure as failure and give up", getClass().getName(), peerId, e);
281          setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
282          releaseLatch();
283          return Flow.NO_MORE_STATE;
284        } catch (ReplicationException e) {
285          long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
286          LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
287            getClass().getName(), peerId, backoff / 1000, e);
288          throw suspend(backoff);
289        }
290        attemps = 0;
291        setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
292        return Flow.HAS_MORE_STATE;
293      case UPDATE_PEER_STORAGE:
294        try {
295          updatePeerStorage(env);
296        } catch (ReplicationException e) {
297          long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
298          LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", getClass().getName(),
299            peerId, backoff / 1000, e);
300          throw suspend(backoff);
301        }
302        attemps = 0;
303        setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
304        return Flow.HAS_MORE_STATE;
305      case REFRESH_PEER_ON_RS:
306        refreshPeer(env, getPeerOperationType());
307        setNextState(nextStateAfterRefresh());
308        return Flow.HAS_MORE_STATE;
309      case SERIAL_PEER_REOPEN_REGIONS:
310        try {
311          reopenRegions(env);
312        } catch (Exception e) {
313          long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
314          LOG.warn("{} reopen regions for peer {} failed,  sleep {} secs", getClass().getName(),
315            peerId, backoff / 1000, e);
316          throw suspend(backoff);
317        }
318        attemps = 0;
319        setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
320        return Flow.HAS_MORE_STATE;
321      case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
322        try {
323          updateLastPushedSequenceIdForSerialPeer(env);
324        } catch (Exception e) {
325          long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
326          LOG.warn("{} set last sequence id for peer {} failed,  sleep {} secs",
327            getClass().getName(), peerId, backoff / 1000, e);
328          throw suspend(backoff);
329        }
330        attemps = 0;
331        setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
332          : PeerModificationState.POST_PEER_MODIFICATION);
333        return Flow.HAS_MORE_STATE;
334      case SERIAL_PEER_SET_PEER_ENABLED:
335        try {
336          enablePeer(env);
337        } catch (ReplicationException e) {
338          long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
339          LOG.warn("{} enable peer before finish for peer {} failed,  sleep {} secs",
340            getClass().getName(), peerId, backoff / 1000, e);
341          throw suspend(backoff);
342        }
343        attemps = 0;
344        setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
345        return Flow.HAS_MORE_STATE;
346      case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
347        refreshPeer(env, PeerOperationType.ENABLE);
348        setNextState(PeerModificationState.POST_PEER_MODIFICATION);
349        return Flow.HAS_MORE_STATE;
350      case POST_PEER_MODIFICATION:
351        try {
352          postPeerModification(env);
353        } catch (ReplicationException e) {
354          long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
355          LOG.warn("{} failed to call postPeerModification for peer {},  sleep {} secs",
356            getClass().getName(), peerId, backoff / 1000, e);
357          throw suspend(backoff);
358        } catch (IOException e) {
359          LOG.warn("{} failed to call post CP hook for peer {}, " +
360            "ignore since the procedure has already done", getClass().getName(), peerId, e);
361        }
362        releaseLatch();
363        return Flow.NO_MORE_STATE;
364      default:
365        throw new UnsupportedOperationException("unhandled state=" + state);
366    }
367  }
368
369  @Override
370  protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
371      throws IOException, InterruptedException {
372    if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
373      // actually the peer related operations has no rollback, but if we haven't done any
374      // modifications on the peer storage yet, we can just return.
375      return;
376    }
377    throw new UnsupportedOperationException();
378  }
379
380  @Override
381  protected PeerModificationState getState(int stateId) {
382    return PeerModificationState.forNumber(stateId);
383  }
384
385  @Override
386  protected int getStateId(PeerModificationState state) {
387    return state.getNumber();
388  }
389
390  @Override
391  protected PeerModificationState getInitialState() {
392    return PeerModificationState.PRE_PEER_MODIFICATION;
393  }
394}