001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.util.List;
021import java.util.Map;
022import java.util.Set;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.ServerName;
025import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
026import org.apache.hadoop.hbase.util.Pair;
027import org.apache.yetus.audience.InterfaceAudience;
028
029/**
030 * Perform read/write to the replication queue storage.
031 */
032@InterfaceAudience.Private
033public interface ReplicationQueueStorage {
034
035  /**
036   * Set the current offset for a specific WAL group in a given queue.
037   * @param queueId    the id of the queue
038   * @param walGroup   the group of the WAL, can be empty if multi wal is not enabled
039   * @param offset     the current offset of replication progress
040   * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication.
041   */
042  void setOffset(ReplicationQueueId queueId, String walGroup, ReplicationGroupOffset offset,
043    Map<String, Long> lastSeqIds) throws ReplicationException;
044
045  /**
046   * Get the current offset of all the WAL groups for a queue
047   * @param queueId the id of the queue
048   * @return a map of all offsets of the WAL groups. The key the is WAL group and the value is the
049   *         position.
050   */
051  Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId)
052    throws ReplicationException;
053
054  /**
055   * Get a list of all queues for the specific peer.
056   * @param peerId the id of the peer
057   * @return a list of queueIds
058   */
059  List<ReplicationQueueId> listAllQueueIds(String peerId) throws ReplicationException;
060
061  /**
062   * Get a list of all queues for the specific region server.
063   * @param serverName the server name of the region server that owns the set of queues
064   * @return a list of queueIds
065   */
066  List<ReplicationQueueId> listAllQueueIds(ServerName serverName) throws ReplicationException;
067
068  /**
069   * Get a list of all queues for the specific region server and the specific peer
070   * @param peerId     the id of the peer
071   * @param serverName the server name of the region server that owns the set of queues
072   * @return a list of queueIds
073   */
074  List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName)
075    throws ReplicationException;
076
077  /**
078   * Get a list of all peer ids.
079   * <p>
080   * This method is designed for HBCK, where we may have some dirty data left in the storage after a
081   * broken procedure run. In normal logic, you should call
082   * {@link ReplicationPeerStorage#listPeerIds()} for getting all the replication peer ids.
083   */
084  List<String> listAllPeerIds() throws ReplicationException;
085
086  /**
087   * Get a list of all queues and the offsets.
088   */
089  List<ReplicationQueueData> listAllQueues() throws ReplicationException;
090
091  /**
092   * Get a list of all region servers that have outstanding replication queues. These servers could
093   * be alive, dead or from a previous run of the cluster.
094   * @return a list of server names
095   */
096  List<ServerName> listAllReplicators() throws ReplicationException;
097
098  /**
099   * Change ownership for the queue identified by queueId and belongs to a dead region server.
100   * @param queueId          the id of the queue
101   * @param targetServerName the name of the target region server
102   * @return the new PeerId and A SortedSet of WALs in its queue
103   */
104  Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId,
105    ServerName targetServerName) throws ReplicationException;
106
107  /**
108   * Remove a replication queue
109   * @param queueId the id of the queue to remove
110   */
111  void removeQueue(ReplicationQueueId queueId) throws ReplicationException;
112
113  /**
114   * Remove all the replication queues for the given peer. Usually used when removing a peer.
115   * @param peerId the id of the peer
116   */
117  void removeAllQueues(String peerId) throws ReplicationException;
118
119  /**
120   * Read the max sequence id of the specific region for a given peer. For serial replication, we
121   * need the max sequenced id to decide whether we can push the next entries.
122   * @param encodedRegionName the encoded region name
123   * @param peerId            peer id
124   * @return the max sequence id of the specific region for a given peer.
125   */
126  long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException;
127
128  /**
129   * Set the max sequence id of a bunch of regions for a given peer. Will be called when setting up
130   * a serial replication peer.
131   * @param peerId     peer id
132   * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication.
133   */
134  void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws ReplicationException;
135
136  /**
137   * Remove all the max sequence id record for the given peer.
138   * @param peerId peer id
139   */
140  void removeLastSequenceIds(String peerId) throws ReplicationException;
141
142  /**
143   * Remove the max sequence id record for the given peer and regions.
144   * @param peerId             peer id
145   * @param encodedRegionNames the encoded region names
146   */
147  void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
148    throws ReplicationException;
149
150  /**
151   * Remove a peer from hfile reference queue.
152   * @param peerId peer cluster id to be removed
153   */
154  void removePeerFromHFileRefs(String peerId) throws ReplicationException;
155
156  /**
157   * Add new hfile references to the queue.
158   * @param peerId peer cluster id to which the hfiles need to be replicated
159   * @param pairs  list of pairs of { HFile location in staging dir, HFile path in region dir which
160   *               will be added in the queue }
161   * @throws ReplicationException if fails to add a hfile reference
162   */
163  void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
164
165  /**
166   * Remove hfile references from the queue.
167   * @param peerId peer cluster id from which this hfile references needs to be removed
168   * @param files  list of hfile references to be removed
169   */
170  void removeHFileRefs(String peerId, List<String> files) throws ReplicationException;
171
172  /**
173   * Get list of all peers from hfile reference queue.
174   * @return a list of peer ids
175   */
176  List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException;
177
178  /**
179   * Get a list of all hfile references in the given peer.
180   * @param peerId a String that identifies the peer
181   * @return a list of hfile references
182   */
183  List<String> getReplicableHFiles(String peerId) throws ReplicationException;
184
185  /**
186   * Load all hfile references in all replication queues. This method guarantees to return a
187   * snapshot which contains all hfile references at the start of this call. However, some newly
188   * created hfile references during the call may not be included.
189   */
190  Set<String> getAllHFileRefs() throws ReplicationException;
191
192  /**
193   * Whether the replication queue table exists.
194   * @return Whether the replication queue table exists
195   */
196  boolean hasData() throws ReplicationException;
197
198  // the below 3 methods are used for migrating
199  /**
200   * Update the replication queue datas for a given region server.
201   */
202  void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
203    throws ReplicationException;
204
205  /**
206   * Update last pushed sequence id for the given regions and peers.
207   */
208  void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
209    throws ReplicationException;
210
211  /**
212   * Add the given hfile refs to the given peer.
213   */
214  void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException;
215
216  // the below method is for clean up stale data after running ReplicatoinSyncUp
217  /**
218   * Remove all the last sequence ids and hfile references data which are written before the given
219   * timestamp.
220   * <p/>
221   * The data of these two types are not used by replication directly.
222   * <p/>
223   * For last sequence ids, we will check it in serial replication, to make sure that we will
224   * replicate all edits in order, so if there are stale data, the worst case is that we will stop
225   * replicating as we think we still need to finish previous ranges first, although actually we
226   * have already replicated them out.
227   * <p/>
228   * For hfile references, it is just used by hfile cleaner to not remove these hfiles before we
229   * replicate them out, so if there are stale data, the worst case is that we can not remove these
230   * hfiles, although actually they have already been replicated out.
231   * <p/>
232   * So it is OK for us to just bring up the cluster first, and then use this method to delete the
233   * stale data, i.e, the data which are written before a specific timestamp.
234   */
235  void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException;
236}