001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.util.List; 021import java.util.Map; 022import java.util.Set; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.hbase.ServerName; 025import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; 026import org.apache.hadoop.hbase.util.Pair; 027import org.apache.yetus.audience.InterfaceAudience; 028 029/** 030 * Perform read/write to the replication queue storage. 031 */ 032@InterfaceAudience.Private 033public interface ReplicationQueueStorage { 034 035 /** 036 * Set the current offset for a specific WAL group in a given queue. 037 * @param queueId the id of the queue 038 * @param walGroup the group of the WAL, can be empty if multi wal is not enabled 039 * @param offset the current offset of replication progress 040 * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication. 041 */ 042 void setOffset(ReplicationQueueId queueId, String walGroup, ReplicationGroupOffset offset, 043 Map<String, Long> lastSeqIds) throws ReplicationException; 044 045 /** 046 * Get the current offset of all the WAL groups for a queue 047 * @param queueId the id of the queue 048 * @return a map of all offsets of the WAL groups. The key the is WAL group and the value is the 049 * position. 050 */ 051 Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId) 052 throws ReplicationException; 053 054 /** 055 * Get a list of all queues for the specific peer. 056 * @param peerId the id of the peer 057 * @return a list of queueIds 058 */ 059 List<ReplicationQueueId> listAllQueueIds(String peerId) throws ReplicationException; 060 061 /** 062 * Get a list of all queues for the specific region server. 063 * @param serverName the server name of the region server that owns the set of queues 064 * @return a list of queueIds 065 */ 066 List<ReplicationQueueId> listAllQueueIds(ServerName serverName) throws ReplicationException; 067 068 /** 069 * Get a list of all queues for the specific region server and the specific peer 070 * @param peerId the id of the peer 071 * @param serverName the server name of the region server that owns the set of queues 072 * @return a list of queueIds 073 */ 074 List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName) 075 throws ReplicationException; 076 077 /** 078 * Get a list of all queues and the offsets. 079 */ 080 List<ReplicationQueueData> listAllQueues() throws ReplicationException; 081 082 /** 083 * Get a list of all region servers that have outstanding replication queues. These servers could 084 * be alive, dead or from a previous run of the cluster. 085 * @return a list of server names 086 */ 087 List<ServerName> listAllReplicators() throws ReplicationException; 088 089 /** 090 * Change ownership for the queue identified by queueId and belongs to a dead region server. 091 * @param queueId the id of the queue 092 * @param targetServerName the name of the target region server 093 * @return the new PeerId and A SortedSet of WALs in its queue 094 */ 095 Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId, 096 ServerName targetServerName) throws ReplicationException; 097 098 /** 099 * Remove a replication queue 100 * @param queueId the id of the queue to remove 101 */ 102 void removeQueue(ReplicationQueueId queueId) throws ReplicationException; 103 104 /** 105 * Remove all the replication queues for the given peer. Usually used when removing a peer. 106 * @param peerId the id of the peer 107 */ 108 void removeAllQueues(String peerId) throws ReplicationException; 109 110 /** 111 * Read the max sequence id of the specific region for a given peer. For serial replication, we 112 * need the max sequenced id to decide whether we can push the next entries. 113 * @param encodedRegionName the encoded region name 114 * @param peerId peer id 115 * @return the max sequence id of the specific region for a given peer. 116 */ 117 long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException; 118 119 /** 120 * Set the max sequence id of a bunch of regions for a given peer. Will be called when setting up 121 * a serial replication peer. 122 * @param peerId peer id 123 * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication. 124 */ 125 void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws ReplicationException; 126 127 /** 128 * Remove all the max sequence id record for the given peer. 129 * @param peerId peer id 130 */ 131 void removeLastSequenceIds(String peerId) throws ReplicationException; 132 133 /** 134 * Remove the max sequence id record for the given peer and regions. 135 * @param peerId peer id 136 * @param encodedRegionNames the encoded region names 137 */ 138 void removeLastSequenceIds(String peerId, List<String> encodedRegionNames) 139 throws ReplicationException; 140 141 /** 142 * Remove a peer from hfile reference queue. 143 * @param peerId peer cluster id to be removed 144 */ 145 void removePeerFromHFileRefs(String peerId) throws ReplicationException; 146 147 /** 148 * Add new hfile references to the queue. 149 * @param peerId peer cluster id to which the hfiles need to be replicated 150 * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which 151 * will be added in the queue } 152 * @throws ReplicationException if fails to add a hfile reference 153 */ 154 void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException; 155 156 /** 157 * Remove hfile references from the queue. 158 * @param peerId peer cluster id from which this hfile references needs to be removed 159 * @param files list of hfile references to be removed 160 */ 161 void removeHFileRefs(String peerId, List<String> files) throws ReplicationException; 162 163 /** 164 * Get list of all peers from hfile reference queue. 165 * @return a list of peer ids 166 */ 167 List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException; 168 169 /** 170 * Get a list of all hfile references in the given peer. 171 * @param peerId a String that identifies the peer 172 * @return a list of hfile references 173 */ 174 List<String> getReplicableHFiles(String peerId) throws ReplicationException; 175 176 /** 177 * Load all hfile references in all replication queues. This method guarantees to return a 178 * snapshot which contains all hfile references at the start of this call. However, some newly 179 * created hfile references during the call may not be included. 180 */ 181 Set<String> getAllHFileRefs() throws ReplicationException; 182 183 /** 184 * Whether the replication queue table exists. 185 * @return Whether the replication queue table exists 186 */ 187 boolean hasData() throws ReplicationException; 188 189 // the below 3 methods are used for migrating 190 /** 191 * Update the replication queue datas for a given region server. 192 */ 193 void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas) 194 throws ReplicationException; 195 196 /** 197 * Update last pushed sequence id for the given regions and peers. 198 */ 199 void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds) 200 throws ReplicationException; 201 202 /** 203 * Add the given hfile refs to the given peer. 204 */ 205 void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException; 206 207 // the below method is for clean up stale data after running ReplicatoinSyncUp 208 /** 209 * Remove all the last sequence ids and hfile references data which are written before the given 210 * timestamp. 211 * <p/> 212 * The data of these two types are not used by replication directly. 213 * <p/> 214 * For last sequence ids, we will check it in serial replication, to make sure that we will 215 * replicate all edits in order, so if there are stale data, the worst case is that we will stop 216 * replicating as we think we still need to finish previous ranges first, although actually we 217 * have already replicated them out. 218 * <p/> 219 * For hfile references, it is just used by hfile cleaner to not remove these hfiles before we 220 * replicate them out, so if there are stale data, the worst case is that we can not remove these 221 * hfiles, although actually they have already been replicated out. 222 * <p/> 223 * So it is OK for us to just bring up the cluster first, and then use this method to delete the 224 * stale data, i.e, the data which are written before a specific timestamp. 225 */ 226 void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException; 227}