001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.UUID; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 032import org.apache.hadoop.hbase.replication.ReplicationException; 033import org.apache.hadoop.hbase.replication.ReplicationPeer; 034import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 035import org.apache.hadoop.hbase.util.Pair; 036import org.apache.hadoop.hbase.wal.WAL.Entry; 037import org.apache.yetus.audience.InterfaceAudience; 038 039/** 040 * Interface that defines a replication source 041 */ 042@InterfaceAudience.Private 043public interface ReplicationSourceInterface { 044 /** 045 * Initializer for the source 046 * @param conf the configuration to use 047 * @param fs the file system to use 048 * @param manager the manager to use 049 * @param server the server for this region server 050 */ 051 void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 052 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 053 String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 054 MetricsSource metrics) throws IOException; 055 056 /** 057 * Add a log to the list of logs to replicate 058 * @param log path to the log to replicate 059 */ 060 void enqueueLog(Path log); 061 062 /** 063 * Add hfile names to the queue to be replicated. 064 * @param tableName Name of the table these files belongs to 065 * @param family Name of the family these files belong to 066 * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir 067 * which will be added in the queue for replication} 068 * @throws ReplicationException If failed to add hfile references 069 */ 070 void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 071 throws ReplicationException; 072 073 /** 074 * Start the replication 075 */ 076 ReplicationSourceInterface startup(); 077 078 /** 079 * End the replication 080 * @param reason why it's terminating 081 */ 082 void terminate(String reason); 083 084 /** 085 * End the replication 086 * @param reason why it's terminating 087 * @param cause the error that's causing it 088 */ 089 void terminate(String reason, Exception cause); 090 091 /** 092 * End the replication 093 * @param reason why it's terminating 094 * @param cause the error that's causing it 095 * @param clearMetrics removes all metrics about this Source 096 */ 097 void terminate(String reason, Exception cause, boolean clearMetrics); 098 099 /** 100 * Get the current log that's replicated 101 * @return the current log 102 */ 103 Path getCurrentPath(); 104 105 /** 106 * Get the queue id that the source is replicating to 107 * @return queue id 108 */ 109 String getQueueId(); 110 111 /** 112 * Get the id that the source is replicating to. 113 * @return peer id 114 */ 115 default String getPeerId() { 116 return getPeer().getId(); 117 } 118 119 /** 120 * Get the replication peer instance. 121 * @return the replication peer instance 122 */ 123 ReplicationPeer getPeer(); 124 125 /** 126 * Get a string representation of the current statistics for this source 127 * @return printable stats 128 */ 129 String getStats(); 130 131 /** Returns peer enabled or not */ 132 default boolean isPeerEnabled() { 133 return getPeer().isPeerEnabled(); 134 } 135 136 /** Returns whether this is sync replication peer. */ 137 default boolean isSyncReplication() { 138 return getPeer().getPeerConfig().isSyncReplication(); 139 } 140 141 /** Returns active or not */ 142 boolean isSourceActive(); 143 144 /** Returns metrics of this replication source */ 145 MetricsSource getSourceMetrics(); 146 147 /** Returns the replication endpoint used by this replication source */ 148 ReplicationEndpoint getReplicationEndpoint(); 149 150 /** Returns the replication source manager */ 151 ReplicationSourceManager getSourceManager(); 152 153 /** Returns the wal file length provider */ 154 WALFileLengthProvider getWALFileLengthProvider(); 155 156 /** 157 * Try to throttle when the peer config with a bandwidth 158 * @param batchSize entries size will be pushed 159 */ 160 void tryThrottle(int batchSize) throws InterruptedException; 161 162 /** 163 * Call this after the shipper thread ship some entries to peer cluster. 164 * @param entries pushed 165 * @param batchSize entries size pushed 166 */ 167 void postShipEdits(List<Entry> entries, int batchSize); 168 169 /** 170 * The queue of WALs only belong to one region server. This will return the server name which all 171 * WALs belong to. 172 * @return the server name which all WALs belong to 173 */ 174 ServerName getServerWALsBelongTo(); 175 176 /** 177 * get the stat of replication for each wal group. 178 * @return stat of replication 179 */ 180 default Map<String, ReplicationStatus> getWalGroupStatus() { 181 return new HashMap<>(); 182 } 183 184 /** Returns whether this is a replication source for recovery. */ 185 default boolean isRecovered() { 186 return false; 187 } 188 189 /** Returns The instance of queueStorage used by this ReplicationSource. */ 190 ReplicationQueueStorage getReplicationQueueStorage(); 191 192 /** 193 * Log the current position to storage. Also clean old logs from the replication queue. Use to 194 * bypass the default call to 195 * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface, WALEntryBatch)} 196 * whem implementation does not need to persist state to backing storage. 197 * @param entryBatch the wal entry batch we just shipped 198 * @return The instance of queueStorage used by this ReplicationSource. 199 */ 200 default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) { 201 getSourceManager().logPositionAndCleanOldLogs(this, entryBatch); 202 } 203}