001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.HashMap; 023import java.util.Map; 024import org.apache.hadoop.hbase.TableName; 025import org.apache.hadoop.hbase.TableNotFoundException; 026import org.apache.hadoop.hbase.client.Connection; 027import org.apache.hadoop.hbase.client.TableDescriptor; 028import org.apache.hadoop.hbase.master.TableStateManager; 029import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 030import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; 031import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; 032import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat; 033import org.apache.hadoop.hbase.replication.ReplicationException; 034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 035import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 036import org.apache.hadoop.hbase.util.Pair; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * The base class for all replication peer related procedure. 043 */ 044@InterfaceAudience.Private 045public abstract class AbstractPeerProcedure<TState> extends AbstractPeerNoLockProcedure<TState> 046 implements PeerProcedureInterface { 047 048 private static final Logger LOG = LoggerFactory.getLogger(AbstractPeerProcedure.class); 049 050 protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000; 051 052 // The sleep interval when waiting table to be enabled or disabled. 053 protected static final int SLEEP_INTERVAL_MS = 1000; 054 055 // used to keep compatible with old client where we can only return after updateStorage. 056 protected ProcedurePrepareLatch latch; 057 058 protected AbstractPeerProcedure() { 059 } 060 061 protected AbstractPeerProcedure(String peerId) { 062 super(peerId); 063 this.latch = ProcedurePrepareLatch.createLatch(2, 1); 064 } 065 066 public ProcedurePrepareLatch getLatch() { 067 return latch; 068 } 069 070 @Override 071 protected LockState acquireLock(MasterProcedureEnv env) { 072 if (env.getProcedureScheduler().waitPeerExclusiveLock(this, peerId)) { 073 return LockState.LOCK_EVENT_WAIT; 074 } 075 return LockState.LOCK_ACQUIRED; 076 } 077 078 @Override 079 protected void releaseLock(MasterProcedureEnv env) { 080 env.getProcedureScheduler().wakePeerExclusiveLock(this, peerId); 081 } 082 083 @Override 084 protected boolean holdLock(MasterProcedureEnv env) { 085 return true; 086 } 087 088 protected final void refreshPeer(MasterProcedureEnv env, PeerOperationType type) { 089 addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() 090 .map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new)); 091 } 092 093 // will be override in test to simulate error 094 protected void enablePeer(MasterProcedureEnv env) throws ReplicationException { 095 env.getReplicationPeerManager().enablePeer(peerId); 096 } 097 098 private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier, 099 ReplicationQueueStorage queueStorage) throws ReplicationException { 100 if (barrier >= 0) { 101 lastSeqIds.put(encodedRegionName, barrier); 102 if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) { 103 queueStorage.setLastSequenceIds(peerId, lastSeqIds); 104 lastSeqIds.clear(); 105 } 106 } 107 } 108 109 protected final void setLastPushedSequenceId(MasterProcedureEnv env, 110 ReplicationPeerConfig peerConfig) throws IOException, ReplicationException { 111 Map<String, Long> lastSeqIds = new HashMap<String, Long>(); 112 for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { 113 if (!td.hasGlobalReplicationScope()) { 114 continue; 115 } 116 TableName tn = td.getTableName(); 117 if (!peerConfig.needToReplicate(tn)) { 118 continue; 119 } 120 setLastPushedSequenceIdForTable(env, tn, lastSeqIds); 121 } 122 if (!lastSeqIds.isEmpty()) { 123 env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds); 124 } 125 } 126 127 // If the table is currently disabling, then we need to wait until it is disabled.We will write 128 // replication barrier for a disabled table. And return whether we need to update the last pushed 129 // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException, 130 // then we do not need to update last pushed sequence id for this table. 131 private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn) 132 throws IOException { 133 for (;;) { 134 try { 135 if (!tsm.getTableState(tn).isDisabling()) { 136 return true; 137 } 138 Thread.sleep(SLEEP_INTERVAL_MS); 139 } catch (TableNotFoundException e) { 140 return false; 141 } catch (InterruptedException e) { 142 throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e); 143 } 144 } 145 } 146 147 // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is 148 // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller 149 // should not forget to check whether the map is empty at last, if not you should call 150 // queueStorage.setLastSequenceIds to write out the remaining entries in the map. 151 protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName, 152 Map<String, Long> lastSeqIds) throws IOException, ReplicationException { 153 TableStateManager tsm = env.getMasterServices().getTableStateManager(); 154 ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); 155 Connection conn = env.getMasterServices().getConnection(); 156 if (!needSetLastPushedSequenceId(tsm, tableName)) { 157 LOG.debug("Skip settting last pushed sequence id for {}", tableName); 158 return; 159 } 160 for (Pair<String, Long> name2Barrier : ReplicationBarrierFamilyFormat 161 .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) { 162 LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier); 163 addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, 164 queueStorage); 165 } 166 } 167 168 protected final void checkPeerModificationEnabled(MasterProcedureEnv env) throws IOException { 169 if (!env.getMasterServices().isReplicationPeerModificationEnabled()) { 170 throw new IOException("Replication peer modification disabled"); 171 } 172 } 173}