001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.util.List;
021import java.util.Map;
022import java.util.Set;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.ServerName;
025import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
026import org.apache.hadoop.hbase.util.Pair;
027import org.apache.yetus.audience.InterfaceAudience;
028
029/**
030 * Perform read/write to the replication queue storage.
031 */
032@InterfaceAudience.Private
033public interface ReplicationQueueStorage {
034
035  /**
036   * Set the current offset for a specific WAL group in a given queue.
037   * @param queueId    the id of the queue
038   * @param walGroup   the group of the WAL, can be empty if multi wal is not enabled
039   * @param offset     the current offset of replication progress
040   * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication.
041   */
042  void setOffset(ReplicationQueueId queueId, String walGroup, ReplicationGroupOffset offset,
043    Map<String, Long> lastSeqIds) throws ReplicationException;
044
045  /**
046   * Get the current offset of all the WAL groups for a queue
047   * @param queueId the id of the queue
048   * @return a map of all offsets of the WAL groups. The key the is WAL group and the value is the
049   *         position.
050   */
051  Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId)
052    throws ReplicationException;
053
054  /**
055   * Get a list of all queues for the specific peer.
056   * @param peerId the id of the peer
057   * @return a list of queueIds
058   */
059  List<ReplicationQueueId> listAllQueueIds(String peerId) throws ReplicationException;
060
061  /**
062   * Get a list of all queues for the specific region server.
063   * @param serverName the server name of the region server that owns the set of queues
064   * @return a list of queueIds
065   */
066  List<ReplicationQueueId> listAllQueueIds(ServerName serverName) throws ReplicationException;
067
068  /**
069   * Get a list of all queues for the specific region server and the specific peer
070   * @param peerId     the id of the peer
071   * @param serverName the server name of the region server that owns the set of queues
072   * @return a list of queueIds
073   */
074  List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName)
075    throws ReplicationException;
076
077  /**
078   * Get a list of all queues and the offsets.
079   */
080  List<ReplicationQueueData> listAllQueues() throws ReplicationException;
081
082  /**
083   * Get a list of all region servers that have outstanding replication queues. These servers could
084   * be alive, dead or from a previous run of the cluster.
085   * @return a list of server names
086   */
087  List<ServerName> listAllReplicators() throws ReplicationException;
088
089  /**
090   * Change ownership for the queue identified by queueId and belongs to a dead region server.
091   * @param queueId          the id of the queue
092   * @param targetServerName the name of the target region server
093   * @return the new PeerId and A SortedSet of WALs in its queue
094   */
095  Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId,
096    ServerName targetServerName) throws ReplicationException;
097
098  /**
099   * Remove a replication queue
100   * @param queueId the id of the queue to remove
101   */
102  void removeQueue(ReplicationQueueId queueId) throws ReplicationException;
103
104  /**
105   * Remove all the replication queues for the given peer. Usually used when removing a peer.
106   * @param peerId the id of the peer
107   */
108  void removeAllQueues(String peerId) throws ReplicationException;
109
110  /**
111   * Read the max sequence id of the specific region for a given peer. For serial replication, we
112   * need the max sequenced id to decide whether we can push the next entries.
113   * @param encodedRegionName the encoded region name
114   * @param peerId            peer id
115   * @return the max sequence id of the specific region for a given peer.
116   */
117  long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException;
118
119  /**
120   * Set the max sequence id of a bunch of regions for a given peer. Will be called when setting up
121   * a serial replication peer.
122   * @param peerId     peer id
123   * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication.
124   */
125  void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws ReplicationException;
126
127  /**
128   * Remove all the max sequence id record for the given peer.
129   * @param peerId peer id
130   */
131  void removeLastSequenceIds(String peerId) throws ReplicationException;
132
133  /**
134   * Remove the max sequence id record for the given peer and regions.
135   * @param peerId             peer id
136   * @param encodedRegionNames the encoded region names
137   */
138  void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
139    throws ReplicationException;
140
141  /**
142   * Remove a peer from hfile reference queue.
143   * @param peerId peer cluster id to be removed
144   */
145  void removePeerFromHFileRefs(String peerId) throws ReplicationException;
146
147  /**
148   * Add new hfile references to the queue.
149   * @param peerId peer cluster id to which the hfiles need to be replicated
150   * @param pairs  list of pairs of { HFile location in staging dir, HFile path in region dir which
151   *               will be added in the queue }
152   * @throws ReplicationException if fails to add a hfile reference
153   */
154  void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
155
156  /**
157   * Remove hfile references from the queue.
158   * @param peerId peer cluster id from which this hfile references needs to be removed
159   * @param files  list of hfile references to be removed
160   */
161  void removeHFileRefs(String peerId, List<String> files) throws ReplicationException;
162
163  /**
164   * Get list of all peers from hfile reference queue.
165   * @return a list of peer ids
166   */
167  List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException;
168
169  /**
170   * Get a list of all hfile references in the given peer.
171   * @param peerId a String that identifies the peer
172   * @return a list of hfile references
173   */
174  List<String> getReplicableHFiles(String peerId) throws ReplicationException;
175
176  /**
177   * Load all hfile references in all replication queues. This method guarantees to return a
178   * snapshot which contains all hfile references at the start of this call. However, some newly
179   * created hfile references during the call may not be included.
180   */
181  Set<String> getAllHFileRefs() throws ReplicationException;
182
183  /**
184   * Whether the replication queue table exists.
185   * @return Whether the replication queue table exists
186   */
187  boolean hasData() throws ReplicationException;
188
189  // the below 3 methods are used for migrating
190  /**
191   * Update the replication queue datas for a given region server.
192   */
193  void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
194    throws ReplicationException;
195
196  /**
197   * Update last pushed sequence id for the given regions and peers.
198   */
199  void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
200    throws ReplicationException;
201
202  /**
203   * Add the given hfile refs to the given peer.
204   */
205  void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException;
206
207  // the below method is for clean up stale data after running ReplicatoinSyncUp
208  /**
209   * Remove all the last sequence ids and hfile references data which are written before the given
210   * timestamp.
211   * <p/>
212   * The data of these two types are not used by replication directly.
213   * <p/>
214   * For last sequence ids, we will check it in serial replication, to make sure that we will
215   * replicate all edits in order, so if there are stale data, the worst case is that we will stop
216   * replicating as we think we still need to finish previous ranges first, although actually we
217   * have already replicated them out.
218   * <p/>
219   * For hfile references, it is just used by hfile cleaner to not remove these hfiles before we
220   * replicate them out, so if there are stale data, the worst case is that we can not remove these
221   * hfiles, although actually they have already been replicated out.
222   * <p/>
223   * So it is OK for us to just bring up the cluster first, and then use this method to delete the
224   * stale data, i.e, the data which are written before a specific timestamp.
225   */
226  void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException;
227}