001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.HashMap;
023import java.util.Map;
024import org.apache.hadoop.hbase.MetaTableAccessor;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Connection;
027import org.apache.hadoop.hbase.client.TableDescriptor;
028import org.apache.hadoop.hbase.master.TableStateManager;
029import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException;
030import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
031import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
032import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
033import org.apache.hadoop.hbase.replication.ReplicationException;
034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
035import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
036import org.apache.hadoop.hbase.replication.ReplicationUtils;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
043
044/**
045 * The base class for all replication peer related procedure.
046 */
047@InterfaceAudience.Private
048public abstract class AbstractPeerProcedure<TState> extends AbstractPeerNoLockProcedure<TState>
049    implements PeerProcedureInterface {
050
051  private static final Logger LOG = LoggerFactory.getLogger(AbstractPeerProcedure.class);
052
053  protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
054
055  // The sleep interval when waiting table to be enabled or disabled.
056  protected static final int SLEEP_INTERVAL_MS = 1000;
057
058  // used to keep compatible with old client where we can only returns after updateStorage.
059  protected ProcedurePrepareLatch latch;
060
061  protected AbstractPeerProcedure() {
062  }
063
064  protected AbstractPeerProcedure(String peerId) {
065    super(peerId);
066    this.latch = ProcedurePrepareLatch.createLatch(2, 1);
067  }
068
069  public ProcedurePrepareLatch getLatch() {
070    return latch;
071  }
072
073  @Override
074  protected LockState acquireLock(MasterProcedureEnv env) {
075    if (env.getProcedureScheduler().waitPeerExclusiveLock(this, peerId)) {
076      return LockState.LOCK_EVENT_WAIT;
077    }
078    return LockState.LOCK_ACQUIRED;
079  }
080
081  @Override
082  protected void releaseLock(MasterProcedureEnv env) {
083    env.getProcedureScheduler().wakePeerExclusiveLock(this, peerId);
084  }
085
086  @Override
087  protected boolean holdLock(MasterProcedureEnv env) {
088    return true;
089  }
090
091  protected final void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
092    addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
093      .map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new));
094  }
095
096  // will be override in test to simulate error
097  @VisibleForTesting
098  protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
099    env.getReplicationPeerManager().enablePeer(peerId);
100  }
101
102  private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
103      ReplicationQueueStorage queueStorage) throws ReplicationException {
104    if (barrier >= 0) {
105      lastSeqIds.put(encodedRegionName, barrier);
106      if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
107        queueStorage.setLastSequenceIds(peerId, lastSeqIds);
108        lastSeqIds.clear();
109      }
110    }
111  }
112
113  protected final void setLastPushedSequenceId(MasterProcedureEnv env,
114      ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
115    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
116    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
117      if (!td.hasGlobalReplicationScope()) {
118        continue;
119      }
120      TableName tn = td.getTableName();
121      if (!ReplicationUtils.contains(peerConfig, tn)) {
122        continue;
123      }
124      setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
125    }
126    if (!lastSeqIds.isEmpty()) {
127      env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
128    }
129  }
130
131  // If the table is currently disabling, then we need to wait until it is disabled.We will write
132  // replication barrier for a disabled table. And return whether we need to update the last pushed
133  // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
134  // then we do not need to update last pushed sequence id for this table.
135  private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
136      throws IOException {
137    for (;;) {
138      try {
139        if (!tsm.getTableState(tn).isDisabling()) {
140          return true;
141        }
142        Thread.sleep(SLEEP_INTERVAL_MS);
143      } catch (TableStateNotFoundException e) {
144        return false;
145      } catch (InterruptedException e) {
146        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
147      }
148    }
149  }
150
151  // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
152  // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
153  // should not forget to check whether the map is empty at last, if not you should call
154  // queueStorage.setLastSequenceIds to write out the remaining entries in the map.
155  protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
156      Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
157    TableStateManager tsm = env.getMasterServices().getTableStateManager();
158    ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
159    Connection conn = env.getMasterServices().getConnection();
160    if (!needSetLastPushedSequenceId(tsm, tableName)) {
161      LOG.debug("Skip settting last pushed sequence id for {}", tableName);
162      return;
163    }
164    for (Pair<String, Long> name2Barrier : MetaTableAccessor
165      .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
166      LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
167      addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
168        queueStorage);
169    }
170  }
171}