001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.UUID; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.TimeoutException; 026 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.hbase.HBaseInterfaceAudience; 032import org.apache.hadoop.hbase.TableDescriptors; 033import org.apache.hadoop.hbase.wal.WAL.Entry; 034import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; 035 036/** 037 * ReplicationEndpoint is a plugin which implements replication 038 * to other HBase clusters, or other systems. ReplicationEndpoint implementation 039 * can be specified at the peer creation time by specifying it 040 * in the {@link ReplicationPeerConfig}. A ReplicationEndpoint is run in a thread 041 * in each region server in the same process. 042 * <p> 043 * ReplicationEndpoint is closely tied to ReplicationSource in a producer-consumer 044 * relation. ReplicationSource is an HBase-private class which tails the logs and manages 045 * the queue of logs plus management and persistence of all the state for replication. 046 * ReplicationEndpoint on the other hand is responsible for doing the actual shipping 047 * and persisting of the WAL entries in the other cluster. 048 */ 049@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 050public interface ReplicationEndpoint extends ReplicationPeerConfigListener { 051 // TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop. 052 // How they relate? Do we #start before #init(Context)? We fail fast if you don't? 053 054 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 055 class Context { 056 private final Configuration localConf; 057 private final Configuration conf; 058 private final FileSystem fs; 059 private final TableDescriptors tableDescriptors; 060 private final ReplicationPeer replicationPeer; 061 private final String peerId; 062 private final UUID clusterId; 063 private final MetricsSource metrics; 064 private final Abortable abortable; 065 066 @InterfaceAudience.Private 067 public Context( 068 final Configuration localConf, 069 final Configuration conf, 070 final FileSystem fs, 071 final String peerId, 072 final UUID clusterId, 073 final ReplicationPeer replicationPeer, 074 final MetricsSource metrics, 075 final TableDescriptors tableDescriptors, 076 final Abortable abortable) { 077 this.localConf = localConf; 078 this.conf = conf; 079 this.fs = fs; 080 this.clusterId = clusterId; 081 this.peerId = peerId; 082 this.replicationPeer = replicationPeer; 083 this.metrics = metrics; 084 this.tableDescriptors = tableDescriptors; 085 this.abortable = abortable; 086 } 087 public Configuration getConfiguration() { 088 return conf; 089 } 090 public Configuration getLocalConfiguration() { 091 return localConf; 092 } 093 public FileSystem getFilesystem() { 094 return fs; 095 } 096 public UUID getClusterId() { 097 return clusterId; 098 } 099 public String getPeerId() { 100 return peerId; 101 } 102 public ReplicationPeerConfig getPeerConfig() { 103 return replicationPeer.getPeerConfig(); 104 } 105 public ReplicationPeer getReplicationPeer() { 106 return replicationPeer; 107 } 108 public MetricsSource getMetrics() { 109 return metrics; 110 } 111 public TableDescriptors getTableDescriptors() { 112 return tableDescriptors; 113 } 114 public Abortable getAbortable() { return abortable; } 115 } 116 117 /** 118 * Initialize the replication endpoint with the given context. 119 * @param context replication context 120 * @throws IOException 121 */ 122 void init(Context context) throws IOException; 123 124 /** Whether or not, the replication endpoint can replicate to it's source cluster with the same 125 * UUID */ 126 boolean canReplicateToSameCluster(); 127 128 /** 129 * Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted 130 * associated UUID. If the replication is not performed to an actual HBase cluster (but 131 * some other system), the UUID returned has to uniquely identify the connected target system. 132 * @return a UUID or null if the peer cluster does not exist or is not connected. 133 */ 134 UUID getPeerUUID(); 135 136 /** 137 * Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication 138 * infrastructure will call this filter before sending the edits to shipEdits(). 139 * @return a {@link WALEntryFilter} or null. 140 */ 141 WALEntryFilter getWALEntryfilter(); 142 143 /** 144 * A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method. 145 */ 146 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 147 static class ReplicateContext { 148 List<Entry> entries; 149 int size; 150 String walGroupId; 151 @InterfaceAudience.Private 152 public ReplicateContext() { 153 } 154 155 public ReplicateContext setEntries(List<Entry> entries) { 156 this.entries = entries; 157 return this; 158 } 159 public ReplicateContext setSize(int size) { 160 this.size = size; 161 return this; 162 } 163 public ReplicateContext setWalGroupId(String walGroupId) { 164 this.walGroupId = walGroupId; 165 return this; 166 } 167 public List<Entry> getEntries() { 168 return entries; 169 } 170 public int getSize() { 171 return size; 172 } 173 public String getWalGroupId(){ 174 return walGroupId; 175 } 176 } 177 178 /** 179 * Replicate the given set of entries (in the context) to the other cluster. 180 * Can block until all the given entries are replicated. Upon this method is returned, 181 * all entries that were passed in the context are assumed to be persisted in the 182 * target cluster. 183 * @param replicateContext a context where WAL entries and other 184 * parameters can be obtained. 185 */ 186 boolean replicate(ReplicateContext replicateContext); 187 188 189 // The below methods are inspired by Guava Service. See 190 // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service. 191 // Below we implement a subset only with different names on some methods so we can implement 192 // the below internally using Guava (without exposing our implementation to 193 // ReplicationEndpoint implementors. 194 195 /** 196 * Returns {@code true} if this service is RUNNING. 197 */ 198 boolean isRunning(); 199 200 /** 201 * @return Return {@code true} is this service is STARTING (but not yet RUNNING). 202 */ 203 boolean isStarting(); 204 205 /** 206 * Initiates service startup and returns immediately. A stopped service may not be restarted. 207 * Equivalent of startAsync call in Guava Service. 208 * @throws IllegalStateException if the service is not new, if it has been run already. 209 */ 210 void start(); 211 212 /** 213 * Waits for the {@link ReplicationEndpoint} to be up and running. 214 * 215 * @throws IllegalStateException if the service reaches a state from which it is not possible to 216 * enter the (internal) running state. e.g. if the state is terminated when this method is 217 * called then this will throw an IllegalStateException. 218 */ 219 void awaitRunning(); 220 221 /** 222 * Waits for the {@link ReplicationEndpoint} to to be up and running for no more 223 * than the given time. 224 * 225 * @param timeout the maximum time to wait 226 * @param unit the time unit of the timeout argument 227 * @throws TimeoutException if the service has not reached the given state within the deadline 228 * @throws IllegalStateException if the service reaches a state from which it is not possible to 229 * enter the (internal) running state. e.g. if the state is terminated when this method is 230 * called then this will throw an IllegalStateException. 231 */ 232 void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException; 233 234 /** 235 * If the service is starting or running, this initiates service shutdown and returns immediately. 236 * If the service has already been stopped, this method returns immediately without taking action. 237 * Equivalent of stopAsync call in Guava Service. 238 */ 239 void stop(); 240 241 /** 242 * Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state. 243 * 244 * @throws IllegalStateException if the service FAILED. 245 */ 246 void awaitTerminated(); 247 248 /** 249 * Waits for the {@link ReplicationEndpoint} to reach a terminal state for no 250 * more than the given time. 251 * 252 * @param timeout the maximum time to wait 253 * @param unit the time unit of the timeout argument 254 * @throws TimeoutException if the service has not reached the given state within the deadline 255 * @throws IllegalStateException if the service FAILED. 256 */ 257 void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException; 258 259 /** 260 * Returns the {@link Throwable} that caused this service to fail. 261 * 262 * @throws IllegalStateException if this service's state isn't FAILED. 263 */ 264 Throwable failureCause(); 265}