001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.UUID; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 032import org.apache.hadoop.hbase.replication.ReplicationException; 033import org.apache.hadoop.hbase.replication.ReplicationPeer; 034import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 035import org.apache.hadoop.hbase.util.Pair; 036import org.apache.hadoop.hbase.wal.WAL.Entry; 037import org.apache.yetus.audience.InterfaceAudience; 038 039/** 040 * Interface that defines a replication source 041 */ 042@InterfaceAudience.Private 043public interface ReplicationSourceInterface { 044 /** 045 * Initializer for the source 046 * @param conf the configuration to use 047 * @param fs the file system to use 048 * @param manager the manager to use 049 * @param server the server for this region server 050 */ 051 void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 052 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 053 String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 054 MetricsSource metrics) throws IOException; 055 056 /** 057 * Add a log to the list of logs to replicate 058 * @param log path to the log to replicate 059 */ 060 void enqueueLog(Path log); 061 062 /** 063 * Add hfile names to the queue to be replicated. 064 * @param tableName Name of the table these files belongs to 065 * @param family Name of the family these files belong to 066 * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir 067 * which will be added in the queue for replication} 068 * @throws ReplicationException If failed to add hfile references 069 */ 070 void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 071 throws ReplicationException; 072 073 /** 074 * Start the replication 075 */ 076 ReplicationSourceInterface startup(); 077 078 /** 079 * End the replication 080 * @param reason why it's terminating 081 */ 082 void terminate(String reason); 083 084 /** 085 * End the replication 086 * @param reason why it's terminating 087 * @param cause the error that's causing it 088 */ 089 void terminate(String reason, Exception cause); 090 091 /** 092 * End the replication 093 * @param reason why it's terminating 094 * @param cause the error that's causing it 095 * @param clearMetrics removes all metrics about this Source 096 */ 097 void terminate(String reason, Exception cause, boolean clearMetrics); 098 099 /** 100 * Get the current log that's replicated 101 * @return the current log 102 */ 103 Path getCurrentPath(); 104 105 /** 106 * Get the queue id that the source is replicating to 107 * @return queue id 108 */ 109 String getQueueId(); 110 111 /** 112 * Get the id that the source is replicating to. 113 * @return peer id 114 */ 115 String getPeerId(); 116 117 /** 118 * Get a string representation of the current statistics for this source 119 * @return printable stats 120 */ 121 String getStats(); 122 123 /** Returns peer enabled or not */ 124 boolean isPeerEnabled(); 125 126 /** Returns active or not */ 127 boolean isSourceActive(); 128 129 /** Returns metrics of this replication source */ 130 MetricsSource getSourceMetrics(); 131 132 /** Returns the replication endpoint used by this replication source */ 133 ReplicationEndpoint getReplicationEndpoint(); 134 135 /** Returns the replication source manager */ 136 ReplicationSourceManager getSourceManager(); 137 138 /** Returns the wal file length provider */ 139 WALFileLengthProvider getWALFileLengthProvider(); 140 141 /** 142 * Try to throttle when the peer config with a bandwidth 143 * @param batchSize entries size will be pushed 144 */ 145 void tryThrottle(int batchSize) throws InterruptedException; 146 147 /** 148 * Call this after the shipper thread ship some entries to peer cluster. 149 * @param entries pushed 150 * @param batchSize entries size pushed 151 */ 152 void postShipEdits(List<Entry> entries, int batchSize); 153 154 /** 155 * The queue of WALs only belong to one region server. This will return the server name which all 156 * WALs belong to. 157 * @return the server name which all WALs belong to 158 */ 159 ServerName getServerWALsBelongTo(); 160 161 /** 162 * get the stat of replication for each wal group. 163 * @return stat of replication 164 */ 165 default Map<String, ReplicationStatus> getWalGroupStatus() { 166 return new HashMap<>(); 167 } 168 169 /** Returns whether this is a replication source for recovery. */ 170 default boolean isRecovered() { 171 return false; 172 } 173 174 /** Returns The instance of queueStorage used by this ReplicationSource. */ 175 ReplicationQueueStorage getReplicationQueueStorage(); 176 177 /** 178 * Log the current position to storage. Also clean old logs from the replication queue. Use to 179 * bypass the default call to 180 * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface, WALEntryBatch)} 181 * whem implementation does not need to persist state to backing storage. 182 * @param entryBatch the wal entry batch we just shipped 183 * @return The instance of queueStorage used by this ReplicationSource. 184 */ 185 default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) { 186 getSourceManager().logPositionAndCleanOldLogs(this, entryBatch); 187 } 188}