001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.UUID;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
032import org.apache.hadoop.hbase.replication.ReplicationException;
033import org.apache.hadoop.hbase.replication.ReplicationPeer;
034import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
035import org.apache.hadoop.hbase.util.Pair;
036import org.apache.hadoop.hbase.wal.WAL.Entry;
037import org.apache.yetus.audience.InterfaceAudience;
038
039/**
040 * Interface that defines a replication source
041 */
042@InterfaceAudience.Private
043public interface ReplicationSourceInterface {
044  /**
045   * Initializer for the source
046   * @param conf    the configuration to use
047   * @param fs      the file system to use
048   * @param manager the manager to use
049   * @param server  the server for this region server
050   */
051  void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
052    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
053    String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
054    MetricsSource metrics) throws IOException;
055
056  /**
057   * Add a log to the list of logs to replicate
058   * @param log path to the log to replicate
059   */
060  void enqueueLog(Path log);
061
062  /**
063   * Add hfile names to the queue to be replicated.
064   * @param tableName Name of the table these files belongs to
065   * @param family    Name of the family these files belong to
066   * @param pairs     list of pairs of { HFile location in staging dir, HFile path in region dir
067   *                  which will be added in the queue for replication}
068   * @throws ReplicationException If failed to add hfile references
069   */
070  void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
071    throws ReplicationException;
072
073  /**
074   * Start the replication
075   */
076  ReplicationSourceInterface startup();
077
078  /**
079   * End the replication
080   * @param reason why it's terminating
081   */
082  void terminate(String reason);
083
084  /**
085   * End the replication
086   * @param reason why it's terminating
087   * @param cause  the error that's causing it
088   */
089  void terminate(String reason, Exception cause);
090
091  /**
092   * End the replication
093   * @param reason       why it's terminating
094   * @param cause        the error that's causing it
095   * @param clearMetrics removes all metrics about this Source
096   */
097  void terminate(String reason, Exception cause, boolean clearMetrics);
098
099  /**
100   * Get the current log that's replicated
101   * @return the current log
102   */
103  Path getCurrentPath();
104
105  /**
106   * Get the queue id that the source is replicating to
107   * @return queue id
108   */
109  String getQueueId();
110
111  /**
112   * Get the id that the source is replicating to.
113   * @return peer id
114   */
115  default String getPeerId() {
116    return getPeer().getId();
117  }
118
119  /**
120   * Get the replication peer instance.
121   * @return the replication peer instance
122   */
123  ReplicationPeer getPeer();
124
125  /**
126   * Get a string representation of the current statistics for this source
127   * @return printable stats
128   */
129  String getStats();
130
131  /**
132   * @return peer enabled or not
133   */
134  default boolean isPeerEnabled() {
135    return getPeer().isPeerEnabled();
136  }
137
138  /**
139   * @return whether this is sync replication peer.
140   */
141  default boolean isSyncReplication() {
142    return getPeer().getPeerConfig().isSyncReplication();
143  }
144
145  /**
146   * @return active or not
147   */
148  boolean isSourceActive();
149
150  /**
151   * @return metrics of this replication source
152   */
153  MetricsSource getSourceMetrics();
154
155  /**
156   * @return the replication endpoint used by this replication source
157   */
158  ReplicationEndpoint getReplicationEndpoint();
159
160  /**
161   * @return the replication source manager
162   */
163  ReplicationSourceManager getSourceManager();
164
165  /**
166   * @return the wal file length provider
167   */
168  WALFileLengthProvider getWALFileLengthProvider();
169
170  /**
171   * Try to throttle when the peer config with a bandwidth
172   * @param batchSize entries size will be pushed
173   */
174  void tryThrottle(int batchSize) throws InterruptedException;
175
176  /**
177   * Call this after the shipper thread ship some entries to peer cluster.
178   * @param entries   pushed
179   * @param batchSize entries size pushed
180   */
181  void postShipEdits(List<Entry> entries, int batchSize);
182
183  /**
184   * The queue of WALs only belong to one region server. This will return the server name which all
185   * WALs belong to.
186   * @return the server name which all WALs belong to
187   */
188  ServerName getServerWALsBelongTo();
189
190  /**
191   * get the stat of replication for each wal group.
192   * @return stat of replication
193   */
194  default Map<String, ReplicationStatus> getWalGroupStatus() {
195    return new HashMap<>();
196  }
197
198  /**
199   * @return whether this is a replication source for recovery.
200   */
201  default boolean isRecovered() {
202    return false;
203  }
204
205  /**
206   * @return The instance of queueStorage used by this ReplicationSource.
207   */
208  ReplicationQueueStorage getReplicationQueueStorage();
209
210  /**
211   * Log the current position to storage. Also clean old logs from the replication queue. Use to
212   * bypass the default call to
213   * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface, WALEntryBatch)}
214   * whem implementation does not need to persist state to backing storage.
215   * @param entryBatch the wal entry batch we just shipped
216   * @return The instance of queueStorage used by this ReplicationSource.
217   */
218  default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
219    getSourceManager().logPositionAndCleanOldLogs(this, entryBatch);
220  }
221}