001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.util.List; 021import java.util.Map; 022import java.util.Set; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.hbase.ServerName; 025import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; 026import org.apache.hadoop.hbase.util.Pair; 027import org.apache.yetus.audience.InterfaceAudience; 028 029/** 030 * Perform read/write to the replication queue storage. 031 */ 032@InterfaceAudience.Private 033public interface ReplicationQueueStorage { 034 035 /** 036 * Set the current offset for a specific WAL group in a given queue. 037 * @param queueId the id of the queue 038 * @param walGroup the group of the WAL, can be empty if multi wal is not enabled 039 * @param offset the current offset of replication progress 040 * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication. 041 */ 042 void setOffset(ReplicationQueueId queueId, String walGroup, ReplicationGroupOffset offset, 043 Map<String, Long> lastSeqIds) throws ReplicationException; 044 045 /** 046 * Get the current offset of all the WAL groups for a queue 047 * @param queueId the id of the queue 048 * @return a map of all offsets of the WAL groups. The key the is WAL group and the value is the 049 * position. 050 */ 051 Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId) 052 throws ReplicationException; 053 054 /** 055 * Get a list of all queues for the specific peer. 056 * @param peerId the id of the peer 057 * @return a list of queueIds 058 */ 059 List<ReplicationQueueId> listAllQueueIds(String peerId) throws ReplicationException; 060 061 /** 062 * Get a list of all queues for the specific region server. 063 * @param serverName the server name of the region server that owns the set of queues 064 * @return a list of queueIds 065 */ 066 List<ReplicationQueueId> listAllQueueIds(ServerName serverName) throws ReplicationException; 067 068 /** 069 * Get a list of all queues for the specific region server and the specific peer 070 * @param peerId the id of the peer 071 * @param serverName the server name of the region server that owns the set of queues 072 * @return a list of queueIds 073 */ 074 List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName) 075 throws ReplicationException; 076 077 /** 078 * Get a list of all peer ids. 079 * <p> 080 * This method is designed for HBCK, where we may have some dirty data left in the storage after a 081 * broken procedure run. In normal logic, you should call 082 * {@link ReplicationPeerStorage#listPeerIds()} for getting all the replication peer ids. 083 */ 084 List<String> listAllPeerIds() throws ReplicationException; 085 086 /** 087 * Get a list of all queues and the offsets. 088 */ 089 List<ReplicationQueueData> listAllQueues() throws ReplicationException; 090 091 /** 092 * Get a list of all region servers that have outstanding replication queues. These servers could 093 * be alive, dead or from a previous run of the cluster. 094 * @return a list of server names 095 */ 096 List<ServerName> listAllReplicators() throws ReplicationException; 097 098 /** 099 * Change ownership for the queue identified by queueId and belongs to a dead region server. 100 * @param queueId the id of the queue 101 * @param targetServerName the name of the target region server 102 * @return the new PeerId and A SortedSet of WALs in its queue 103 */ 104 Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId, 105 ServerName targetServerName) throws ReplicationException; 106 107 /** 108 * Remove a replication queue 109 * @param queueId the id of the queue to remove 110 */ 111 void removeQueue(ReplicationQueueId queueId) throws ReplicationException; 112 113 /** 114 * Remove all the replication queues for the given peer. Usually used when removing a peer. 115 * @param peerId the id of the peer 116 */ 117 void removeAllQueues(String peerId) throws ReplicationException; 118 119 /** 120 * Read the max sequence id of the specific region for a given peer. For serial replication, we 121 * need the max sequenced id to decide whether we can push the next entries. 122 * @param encodedRegionName the encoded region name 123 * @param peerId peer id 124 * @return the max sequence id of the specific region for a given peer. 125 */ 126 long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException; 127 128 /** 129 * Set the max sequence id of a bunch of regions for a given peer. Will be called when setting up 130 * a serial replication peer. 131 * @param peerId peer id 132 * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication. 133 */ 134 void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws ReplicationException; 135 136 /** 137 * Remove all the max sequence id record for the given peer. 138 * @param peerId peer id 139 */ 140 void removeLastSequenceIds(String peerId) throws ReplicationException; 141 142 /** 143 * Remove the max sequence id record for the given peer and regions. 144 * @param peerId peer id 145 * @param encodedRegionNames the encoded region names 146 */ 147 void removeLastSequenceIds(String peerId, List<String> encodedRegionNames) 148 throws ReplicationException; 149 150 /** 151 * Remove a peer from hfile reference queue. 152 * @param peerId peer cluster id to be removed 153 */ 154 void removePeerFromHFileRefs(String peerId) throws ReplicationException; 155 156 /** 157 * Add new hfile references to the queue. 158 * @param peerId peer cluster id to which the hfiles need to be replicated 159 * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which 160 * will be added in the queue } 161 * @throws ReplicationException if fails to add a hfile reference 162 */ 163 void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException; 164 165 /** 166 * Remove hfile references from the queue. 167 * @param peerId peer cluster id from which this hfile references needs to be removed 168 * @param files list of hfile references to be removed 169 */ 170 void removeHFileRefs(String peerId, List<String> files) throws ReplicationException; 171 172 /** 173 * Get list of all peers from hfile reference queue. 174 * @return a list of peer ids 175 */ 176 List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException; 177 178 /** 179 * Get a list of all hfile references in the given peer. 180 * @param peerId a String that identifies the peer 181 * @return a list of hfile references 182 */ 183 List<String> getReplicableHFiles(String peerId) throws ReplicationException; 184 185 /** 186 * Load all hfile references in all replication queues. This method guarantees to return a 187 * snapshot which contains all hfile references at the start of this call. However, some newly 188 * created hfile references during the call may not be included. 189 */ 190 Set<String> getAllHFileRefs() throws ReplicationException; 191 192 /** 193 * Whether the replication queue table exists. 194 * @return Whether the replication queue table exists 195 */ 196 boolean hasData() throws ReplicationException; 197 198 // the below 3 methods are used for migrating 199 /** 200 * Update the replication queue datas for a given region server. 201 */ 202 void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas) 203 throws ReplicationException; 204 205 /** 206 * Update last pushed sequence id for the given regions and peers. 207 */ 208 void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds) 209 throws ReplicationException; 210 211 /** 212 * Add the given hfile refs to the given peer. 213 */ 214 void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException; 215 216 // the below method is for clean up stale data after running ReplicatoinSyncUp 217 /** 218 * Remove all the last sequence ids and hfile references data which are written before the given 219 * timestamp. 220 * <p/> 221 * The data of these two types are not used by replication directly. 222 * <p/> 223 * For last sequence ids, we will check it in serial replication, to make sure that we will 224 * replicate all edits in order, so if there are stale data, the worst case is that we will stop 225 * replicating as we think we still need to finish previous ranges first, although actually we 226 * have already replicated them out. 227 * <p/> 228 * For hfile references, it is just used by hfile cleaner to not remove these hfiles before we 229 * replicate them out, so if there are stale data, the worst case is that we can not remove these 230 * hfiles, although actually they have already been replicated out. 231 * <p/> 232 * So it is OK for us to just bring up the cluster first, and then use this method to delete the 233 * stale data, i.e, the data which are written before a specific timestamp. 234 */ 235 void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException; 236}