001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.UUID;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Server;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
033import org.apache.hadoop.hbase.replication.ReplicationException;
034import org.apache.hadoop.hbase.replication.ReplicationPeer;
035import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
036import org.apache.hadoop.hbase.util.Pair;
037import org.apache.hadoop.hbase.wal.WAL.Entry;
038import org.apache.yetus.audience.InterfaceAudience;
039
040/**
041 * Interface that defines a replication source
042 */
043@InterfaceAudience.Private
044public interface ReplicationSourceInterface {
045  /**
046   * Initializer for the source
047   * @param conf the configuration to use
048   * @param fs the file system to use
049   * @param manager the manager to use
050   * @param server the server for this region server
051   */
052  void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
053      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
054      String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
055      MetricsSource metrics) throws IOException;
056
057  /**
058   * Add a log to the list of logs to replicate
059   * @param log path to the log to replicate
060   */
061  void enqueueLog(Path log);
062
063  /**
064   * Add hfile names to the queue to be replicated.
065   * @param tableName Name of the table these files belongs to
066   * @param family Name of the family these files belong to
067   * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
068   *          will be added in the queue for replication}
069   * @throws ReplicationException If failed to add hfile references
070   */
071  void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
072      throws ReplicationException;
073
074  /**
075   * Start the replication
076   */
077  ReplicationSourceInterface startup();
078
079  /**
080   * End the replication
081   * @param reason why it's terminating
082   */
083  void terminate(String reason);
084
085  /**
086   * End the replication
087   * @param reason why it's terminating
088   * @param cause the error that's causing it
089   */
090  void terminate(String reason, Exception cause);
091
092  /**
093   * End the replication
094   * @param reason why it's terminating
095   * @param cause the error that's causing it
096   * @param clearMetrics removes all metrics about this Source
097   */
098  void terminate(String reason, Exception cause, boolean clearMetrics);
099
100  /**
101   * Get the current log that's replicated
102   * @return the current log
103   */
104  Path getCurrentPath();
105
106  /**
107   * Get the queue id that the source is replicating to
108   *
109   * @return queue id
110   */
111  String getQueueId();
112
113  /**
114   * Get the id that the source is replicating to.
115   *
116   * @return peer id
117   */
118  String getPeerId();
119
120  /**
121   * Get a string representation of the current statistics
122   * for this source
123   * @return printable stats
124   */
125  String getStats();
126
127  /**
128   * @return peer enabled or not
129   */
130  boolean isPeerEnabled();
131
132  /**
133   * @return active or not
134   */
135  boolean isSourceActive();
136
137  /**
138   * @return metrics of this replication source
139   */
140  MetricsSource getSourceMetrics();
141
142  /**
143   * @return the replication endpoint used by this replication source
144   */
145  ReplicationEndpoint getReplicationEndpoint();
146
147  /**
148   * @return the replication source manager
149   */
150  ReplicationSourceManager getSourceManager();
151
152  /**
153   * @return the wal file length provider
154   */
155  WALFileLengthProvider getWALFileLengthProvider();
156
157  /**
158   * Try to throttle when the peer config with a bandwidth
159   * @param batchSize entries size will be pushed
160   */
161  void tryThrottle(int batchSize) throws InterruptedException;
162
163  /**
164   * Call this after the shipper thread ship some entries to peer cluster.
165   * @param entries pushed
166   * @param batchSize entries size pushed
167   */
168  void postShipEdits(List<Entry> entries, int batchSize);
169
170  /**
171   * The queue of WALs only belong to one region server. This will return the server name which all
172   * WALs belong to.
173   * @return the server name which all WALs belong to
174   */
175  ServerName getServerWALsBelongTo();
176
177  /**
178   * get the stat of replication for each wal group.
179   * @return stat of replication
180   */
181  default Map<String, ReplicationStatus> getWalGroupStatus() {
182    return new HashMap<>();
183  }
184
185  /**
186   * @return whether this is a replication source for recovery.
187   */
188  default boolean isRecovered() {
189    return false;
190  }
191
192  /**
193   * @return The instance of queueStorage used by this ReplicationSource.
194   */
195  ReplicationQueueStorage getReplicationQueueStorage();
196
197  /**
198   * Log the current position to storage. Also clean old logs from the replication queue.
199   * Use to bypass the default call to
200   * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface,
201   * WALEntryBatch)} whem implementation does not need to persist state to backing storage.
202   * @param entryBatch the wal entry batch we just shipped
203   * @return The instance of queueStorage used by this ReplicationSource.
204   */
205  default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
206    getSourceManager().logPositionAndCleanOldLogs(this, entryBatch);
207  }
208}