001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.UUID; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Server; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 033import org.apache.hadoop.hbase.replication.ReplicationException; 034import org.apache.hadoop.hbase.replication.ReplicationPeer; 035import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 036import org.apache.hadoop.hbase.util.Pair; 037import org.apache.hadoop.hbase.wal.WAL.Entry; 038import org.apache.yetus.audience.InterfaceAudience; 039 040/** 041 * Interface that defines a replication source 042 */ 043@InterfaceAudience.Private 044public interface ReplicationSourceInterface { 045 /** 046 * Initializer for the source 047 * @param conf the configuration to use 048 * @param fs the file system to use 049 * @param manager the manager to use 050 * @param server the server for this region server 051 */ 052 void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 053 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 054 String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 055 MetricsSource metrics) throws IOException; 056 057 /** 058 * Add a log to the list of logs to replicate 059 * @param log path to the log to replicate 060 */ 061 void enqueueLog(Path log); 062 063 /** 064 * Add hfile names to the queue to be replicated. 065 * @param tableName Name of the table these files belongs to 066 * @param family Name of the family these files belong to 067 * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which 068 * will be added in the queue for replication} 069 * @throws ReplicationException If failed to add hfile references 070 */ 071 void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 072 throws ReplicationException; 073 074 /** 075 * Start the replication 076 */ 077 ReplicationSourceInterface startup(); 078 079 /** 080 * End the replication 081 * @param reason why it's terminating 082 */ 083 void terminate(String reason); 084 085 /** 086 * End the replication 087 * @param reason why it's terminating 088 * @param cause the error that's causing it 089 */ 090 void terminate(String reason, Exception cause); 091 092 /** 093 * End the replication 094 * @param reason why it's terminating 095 * @param cause the error that's causing it 096 * @param clearMetrics removes all metrics about this Source 097 */ 098 void terminate(String reason, Exception cause, boolean clearMetrics); 099 100 /** 101 * Get the current log that's replicated 102 * @return the current log 103 */ 104 Path getCurrentPath(); 105 106 /** 107 * Get the queue id that the source is replicating to 108 * 109 * @return queue id 110 */ 111 String getQueueId(); 112 113 /** 114 * Get the id that the source is replicating to. 115 * 116 * @return peer id 117 */ 118 String getPeerId(); 119 120 /** 121 * Get a string representation of the current statistics 122 * for this source 123 * @return printable stats 124 */ 125 String getStats(); 126 127 /** 128 * @return peer enabled or not 129 */ 130 boolean isPeerEnabled(); 131 132 /** 133 * @return active or not 134 */ 135 boolean isSourceActive(); 136 137 /** 138 * @return metrics of this replication source 139 */ 140 MetricsSource getSourceMetrics(); 141 142 /** 143 * @return the replication endpoint used by this replication source 144 */ 145 ReplicationEndpoint getReplicationEndpoint(); 146 147 /** 148 * @return the replication source manager 149 */ 150 ReplicationSourceManager getSourceManager(); 151 152 /** 153 * @return the wal file length provider 154 */ 155 WALFileLengthProvider getWALFileLengthProvider(); 156 157 /** 158 * Try to throttle when the peer config with a bandwidth 159 * @param batchSize entries size will be pushed 160 */ 161 void tryThrottle(int batchSize) throws InterruptedException; 162 163 /** 164 * Call this after the shipper thread ship some entries to peer cluster. 165 * @param entries pushed 166 * @param batchSize entries size pushed 167 */ 168 void postShipEdits(List<Entry> entries, int batchSize); 169 170 /** 171 * The queue of WALs only belong to one region server. This will return the server name which all 172 * WALs belong to. 173 * @return the server name which all WALs belong to 174 */ 175 ServerName getServerWALsBelongTo(); 176 177 /** 178 * get the stat of replication for each wal group. 179 * @return stat of replication 180 */ 181 default Map<String, ReplicationStatus> getWalGroupStatus() { 182 return new HashMap<>(); 183 } 184 185 /** 186 * @return whether this is a replication source for recovery. 187 */ 188 default boolean isRecovered() { 189 return false; 190 } 191 192 /** 193 * @return The instance of queueStorage used by this ReplicationSource. 194 */ 195 ReplicationQueueStorage getReplicationQueueStorage(); 196 197 /** 198 * Log the current position to storage. Also clean old logs from the replication queue. 199 * Use to bypass the default call to 200 * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface, 201 * WALEntryBatch)} whem implementation does not need to persist state to backing storage. 202 * @param entryBatch the wal entry batch we just shipped 203 * @return The instance of queueStorage used by this ReplicationSource. 204 */ 205 default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) { 206 getSourceManager().logPositionAndCleanOldLogs(this, entryBatch); 207 } 208}