001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.UUID; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 032import org.apache.hadoop.hbase.replication.ReplicationException; 033import org.apache.hadoop.hbase.replication.ReplicationPeer; 034import org.apache.hadoop.hbase.replication.ReplicationQueueData; 035import org.apache.hadoop.hbase.replication.ReplicationQueueId; 036import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.hadoop.hbase.wal.WAL.Entry; 039import org.apache.yetus.audience.InterfaceAudience; 040 041/** 042 * Interface that defines a replication source 043 */ 044@InterfaceAudience.Private 045public interface ReplicationSourceInterface { 046 /** 047 * Initializer for the source 048 * @param conf the configuration to use 049 * @param fs the file system to use 050 * @param manager the manager to use 051 * @param queueStorage the replication queue storage 052 * @param replicationPeer the replication peer 053 * @param server the server for this region server 054 * @param queueData the existing replication queue data, contains the queue id and 055 * replication start offsets 056 * @param clusterId the cluster id 057 * @param walFileLengthProvider for getting the length of the WAL file which is currently being 058 * written 059 * @param metrics the replication metrics 060 */ 061 void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 062 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 063 ReplicationQueueData queueData, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 064 MetricsSource metrics) throws IOException; 065 066 /** 067 * Add a log to the list of logs to replicate 068 * @param log path to the log to replicate 069 */ 070 void enqueueLog(Path log); 071 072 /** 073 * Add hfile names to the queue to be replicated. 074 * @param tableName Name of the table these files belongs to 075 * @param family Name of the family these files belong to 076 * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir 077 * which will be added in the queue for replication} 078 * @throws ReplicationException If failed to add hfile references 079 */ 080 void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 081 throws ReplicationException; 082 083 /** 084 * Start the replication 085 */ 086 ReplicationSourceInterface startup(); 087 088 /** 089 * End the replication 090 * @param reason why it's terminating 091 */ 092 void terminate(String reason); 093 094 /** 095 * End the replication 096 * @param reason why it's terminating 097 * @param cause the error that's causing it 098 */ 099 void terminate(String reason, Exception cause); 100 101 /** 102 * End the replication 103 * @param reason why it's terminating 104 * @param cause the error that's causing it 105 * @param clearMetrics removes all metrics about this Source 106 */ 107 void terminate(String reason, Exception cause, boolean clearMetrics); 108 109 /** 110 * Get the current log that's replicated 111 * @return the current log 112 */ 113 Path getCurrentPath(); 114 115 /** 116 * Get the queue id that the source is replicating to 117 * @return queue id 118 */ 119 ReplicationQueueId getQueueId(); 120 121 /** 122 * Get the id that the source is replicating to. 123 * @return peer id 124 */ 125 default String getPeerId() { 126 return getQueueId().getPeerId(); 127 } 128 129 /** 130 * Get the replication peer instance. 131 * @return the replication peer instance 132 */ 133 ReplicationPeer getPeer(); 134 135 /** 136 * Get a string representation of the current statistics for this source 137 * @return printable stats 138 */ 139 String getStats(); 140 141 /** Returns peer enabled or not */ 142 default boolean isPeerEnabled() { 143 return getPeer().isPeerEnabled(); 144 } 145 146 /** Returns whether this is sync replication peer. */ 147 default boolean isSyncReplication() { 148 return getPeer().getPeerConfig().isSyncReplication(); 149 } 150 151 /** Returns active or not */ 152 boolean isSourceActive(); 153 154 /** Returns metrics of this replication source */ 155 MetricsSource getSourceMetrics(); 156 157 /** Returns the replication endpoint used by this replication source */ 158 ReplicationEndpoint getReplicationEndpoint(); 159 160 /** Returns the replication source manager */ 161 ReplicationSourceManager getSourceManager(); 162 163 /** Returns the wal file length provider */ 164 WALFileLengthProvider getWALFileLengthProvider(); 165 166 /** 167 * Try to throttle when the peer config with a bandwidth 168 * @param batchSize entries size will be pushed 169 */ 170 void tryThrottle(int batchSize) throws InterruptedException; 171 172 /** 173 * Call this after the shipper thread ship some entries to peer cluster. 174 * @param entries pushed 175 * @param batchSize entries size pushed 176 */ 177 void postShipEdits(List<Entry> entries, long batchSize); 178 179 /** 180 * The queue of WALs only belong to one region server. This will return the server name which all 181 * WALs belong to. 182 * @return the server name which all WALs belong to 183 */ 184 ServerName getServerWALsBelongTo(); 185 186 /** 187 * get the stat of replication for each wal group. 188 * @return stat of replication 189 */ 190 default Map<String, ReplicationStatus> getWalGroupStatus() { 191 return new HashMap<>(); 192 } 193 194 /** Returns whether this is a replication source for recovery. */ 195 default boolean isRecovered() { 196 return getQueueId().isRecovered(); 197 } 198 199 /** Returns The instance of queueStorage used by this ReplicationSource. */ 200 ReplicationQueueStorage getReplicationQueueStorage(); 201 202 /** 203 * Log the current position to storage. Also clean old logs from the replication queue. Use to 204 * bypass the default call to 205 * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface, WALEntryBatch)} 206 * whem implementation does not need to persist state to backing storage. 207 * @param entryBatch the wal entry batch we just shipped 208 * @return The instance of queueStorage used by this ReplicationSource. 209 */ 210 default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) { 211 getSourceManager().logPositionAndCleanOldLogs(this, entryBatch); 212 } 213}