001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.UUID;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
032import org.apache.hadoop.hbase.replication.ReplicationException;
033import org.apache.hadoop.hbase.replication.ReplicationPeer;
034import org.apache.hadoop.hbase.replication.ReplicationQueueData;
035import org.apache.hadoop.hbase.replication.ReplicationQueueId;
036import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.hadoop.hbase.wal.WAL.Entry;
039import org.apache.yetus.audience.InterfaceAudience;
040
041/**
042 * Interface that defines a replication source
043 */
044@InterfaceAudience.Private
045public interface ReplicationSourceInterface {
046  /**
047   * Initializer for the source
048   * @param conf                  the configuration to use
049   * @param fs                    the file system to use
050   * @param manager               the manager to use
051   * @param queueStorage          the replication queue storage
052   * @param replicationPeer       the replication peer
053   * @param server                the server for this region server
054   * @param queueData             the existing replication queue data, contains the queue id and
055   *                              replication start offsets
056   * @param clusterId             the cluster id
057   * @param walFileLengthProvider for getting the length of the WAL file which is currently being
058   *                              written
059   * @param metrics               the replication metrics
060   */
061  void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
062    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
063    ReplicationQueueData queueData, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
064    MetricsSource metrics) throws IOException;
065
066  /**
067   * Add a log to the list of logs to replicate
068   * @param log path to the log to replicate
069   */
070  void enqueueLog(Path log);
071
072  /**
073   * Add hfile names to the queue to be replicated.
074   * @param tableName Name of the table these files belongs to
075   * @param family    Name of the family these files belong to
076   * @param pairs     list of pairs of { HFile location in staging dir, HFile path in region dir
077   *                  which will be added in the queue for replication}
078   * @throws ReplicationException If failed to add hfile references
079   */
080  void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
081    throws ReplicationException;
082
083  /**
084   * Start the replication
085   */
086  ReplicationSourceInterface startup();
087
088  /**
089   * End the replication
090   * @param reason why it's terminating
091   */
092  void terminate(String reason);
093
094  /**
095   * End the replication
096   * @param reason why it's terminating
097   * @param cause  the error that's causing it
098   */
099  void terminate(String reason, Exception cause);
100
101  /**
102   * End the replication
103   * @param reason       why it's terminating
104   * @param cause        the error that's causing it
105   * @param clearMetrics removes all metrics about this Source
106   */
107  void terminate(String reason, Exception cause, boolean clearMetrics);
108
109  /**
110   * Get the current log that's replicated
111   * @return the current log
112   */
113  Path getCurrentPath();
114
115  /**
116   * Get the queue id that the source is replicating to
117   * @return queue id
118   */
119  ReplicationQueueId getQueueId();
120
121  /**
122   * Get the id that the source is replicating to.
123   * @return peer id
124   */
125  default String getPeerId() {
126    return getQueueId().getPeerId();
127  }
128
129  /**
130   * Get the replication peer instance.
131   * @return the replication peer instance
132   */
133  ReplicationPeer getPeer();
134
135  /**
136   * Get a string representation of the current statistics for this source
137   * @return printable stats
138   */
139  String getStats();
140
141  /** Returns peer enabled or not */
142  default boolean isPeerEnabled() {
143    return getPeer().isPeerEnabled();
144  }
145
146  /** Returns whether this is sync replication peer. */
147  default boolean isSyncReplication() {
148    return getPeer().getPeerConfig().isSyncReplication();
149  }
150
151  /** Returns active or not */
152  boolean isSourceActive();
153
154  /** Returns metrics of this replication source */
155  MetricsSource getSourceMetrics();
156
157  /** Returns the replication endpoint used by this replication source */
158  ReplicationEndpoint getReplicationEndpoint();
159
160  /** Returns the replication source manager */
161  ReplicationSourceManager getSourceManager();
162
163  /** Returns the wal file length provider */
164  WALFileLengthProvider getWALFileLengthProvider();
165
166  /**
167   * Try to throttle when the peer config with a bandwidth
168   * @param batchSize entries size will be pushed
169   */
170  void tryThrottle(int batchSize) throws InterruptedException;
171
172  /**
173   * Call this after the shipper thread ship some entries to peer cluster.
174   * @param entries   pushed
175   * @param batchSize entries size pushed
176   */
177  void postShipEdits(List<Entry> entries, long batchSize);
178
179  /**
180   * The queue of WALs only belong to one region server. This will return the server name which all
181   * WALs belong to.
182   * @return the server name which all WALs belong to
183   */
184  ServerName getServerWALsBelongTo();
185
186  /**
187   * get the stat of replication for each wal group.
188   * @return stat of replication
189   */
190  default Map<String, ReplicationStatus> getWalGroupStatus() {
191    return new HashMap<>();
192  }
193
194  /** Returns whether this is a replication source for recovery. */
195  default boolean isRecovered() {
196    return getQueueId().isRecovered();
197  }
198
199  /** Returns The instance of queueStorage used by this ReplicationSource. */
200  ReplicationQueueStorage getReplicationQueueStorage();
201
202  /**
203   * Log the current position to storage. Also clean old logs from the replication queue. Use to
204   * bypass the default call to
205   * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface, WALEntryBatch)}
206   * whem implementation does not need to persist state to backing storage.
207   * @param entryBatch the wal entry batch we just shipped
208   * @return The instance of queueStorage used by this ReplicationSource.
209   */
210  default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
211    getSourceManager().logPositionAndCleanOldLogs(this, entryBatch);
212  }
213}