001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.UUID;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
032import org.apache.hadoop.hbase.replication.ReplicationException;
033import org.apache.hadoop.hbase.replication.ReplicationPeer;
034import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
035import org.apache.hadoop.hbase.util.Pair;
036import org.apache.hadoop.hbase.wal.WAL.Entry;
037import org.apache.yetus.audience.InterfaceAudience;
038
039/**
040 * Interface that defines a replication source
041 */
042@InterfaceAudience.Private
043public interface ReplicationSourceInterface {
044  /**
045   * Initializer for the source
046   * @param conf    the configuration to use
047   * @param fs      the file system to use
048   * @param manager the manager to use
049   * @param server  the server for this region server
050   */
051  void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
052    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
053    String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
054    MetricsSource metrics) throws IOException;
055
056  /**
057   * Add a log to the list of logs to replicate
058   * @param log path to the log to replicate
059   */
060  void enqueueLog(Path log);
061
062  /**
063   * Add hfile names to the queue to be replicated.
064   * @param tableName Name of the table these files belongs to
065   * @param family    Name of the family these files belong to
066   * @param pairs     list of pairs of { HFile location in staging dir, HFile path in region dir
067   *                  which will be added in the queue for replication}
068   * @throws ReplicationException If failed to add hfile references
069   */
070  void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
071    throws ReplicationException;
072
073  /**
074   * Start the replication
075   */
076  ReplicationSourceInterface startup();
077
078  /**
079   * End the replication
080   * @param reason why it's terminating
081   */
082  void terminate(String reason);
083
084  /**
085   * End the replication
086   * @param reason why it's terminating
087   * @param cause  the error that's causing it
088   */
089  void terminate(String reason, Exception cause);
090
091  /**
092   * End the replication
093   * @param reason       why it's terminating
094   * @param cause        the error that's causing it
095   * @param clearMetrics removes all metrics about this Source
096   */
097  void terminate(String reason, Exception cause, boolean clearMetrics);
098
099  /**
100   * Get the current log that's replicated
101   * @return the current log
102   */
103  Path getCurrentPath();
104
105  /**
106   * Get the queue id that the source is replicating to
107   * @return queue id
108   */
109  String getQueueId();
110
111  /**
112   * Get the id that the source is replicating to.
113   * @return peer id
114   */
115  default String getPeerId() {
116    return getPeer().getId();
117  }
118
119  /**
120   * Get the replication peer instance.
121   * @return the replication peer instance
122   */
123  ReplicationPeer getPeer();
124
125  /**
126   * Get a string representation of the current statistics for this source
127   * @return printable stats
128   */
129  String getStats();
130
131  /** Returns peer enabled or not */
132  default boolean isPeerEnabled() {
133    return getPeer().isPeerEnabled();
134  }
135
136  /** Returns whether this is sync replication peer. */
137  default boolean isSyncReplication() {
138    return getPeer().getPeerConfig().isSyncReplication();
139  }
140
141  /** Returns active or not */
142  boolean isSourceActive();
143
144  /** Returns metrics of this replication source */
145  MetricsSource getSourceMetrics();
146
147  /** Returns the replication endpoint used by this replication source */
148  ReplicationEndpoint getReplicationEndpoint();
149
150  /** Returns the replication source manager */
151  ReplicationSourceManager getSourceManager();
152
153  /** Returns the wal file length provider */
154  WALFileLengthProvider getWALFileLengthProvider();
155
156  /**
157   * Try to throttle when the peer config with a bandwidth
158   * @param batchSize entries size will be pushed
159   */
160  void tryThrottle(int batchSize) throws InterruptedException;
161
162  /**
163   * Call this after the shipper thread ship some entries to peer cluster.
164   * @param entries   pushed
165   * @param batchSize entries size pushed
166   */
167  void postShipEdits(List<Entry> entries, int batchSize);
168
169  /**
170   * The queue of WALs only belong to one region server. This will return the server name which all
171   * WALs belong to.
172   * @return the server name which all WALs belong to
173   */
174  ServerName getServerWALsBelongTo();
175
176  /**
177   * get the stat of replication for each wal group.
178   * @return stat of replication
179   */
180  default Map<String, ReplicationStatus> getWalGroupStatus() {
181    return new HashMap<>();
182  }
183
184  /** Returns whether this is a replication source for recovery. */
185  default boolean isRecovered() {
186    return false;
187  }
188
189  /** Returns The instance of queueStorage used by this ReplicationSource. */
190  ReplicationQueueStorage getReplicationQueueStorage();
191
192  /**
193   * Log the current position to storage. Also clean old logs from the replication queue. Use to
194   * bypass the default call to
195   * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface, WALEntryBatch)}
196   * whem implementation does not need to persist state to backing storage.
197   * @param entryBatch the wal entry batch we just shipped
198   * @return The instance of queueStorage used by this ReplicationSource.
199   */
200  default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
201    getSourceManager().logPositionAndCleanOldLogs(this, entryBatch);
202  }
203}