001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.UUID; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.TimeoutException; 026 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.hbase.HBaseInterfaceAudience; 032import org.apache.hadoop.hbase.TableDescriptors; 033import org.apache.hadoop.hbase.wal.WAL.Entry; 034import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; 035 036/** 037 * ReplicationEndpoint is a plugin which implements replication 038 * to other HBase clusters, or other systems. ReplicationEndpoint implementation 039 * can be specified at the peer creation time by specifying it 040 * in the {@link ReplicationPeerConfig}. A ReplicationEndpoint is run in a thread 041 * in each region server in the same process. 042 * <p> 043 * ReplicationEndpoint is closely tied to ReplicationSource in a producer-consumer 044 * relation. ReplicationSource is an HBase-private class which tails the logs and manages 045 * the queue of logs plus management and persistence of all the state for replication. 046 * ReplicationEndpoint on the other hand is responsible for doing the actual shipping 047 * and persisting of the WAL entries in the other cluster. 048 */ 049@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 050public interface ReplicationEndpoint extends ReplicationPeerConfigListener { 051 // TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop. 052 // How they relate? Do we #start before #init(Context)? We fail fast if you don't? 053 054 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 055 class Context { 056 private final Configuration localConf; 057 private final Configuration conf; 058 private final FileSystem fs; 059 private final TableDescriptors tableDescriptors; 060 private final ReplicationPeer replicationPeer; 061 private final String peerId; 062 private final UUID clusterId; 063 private final MetricsSource metrics; 064 private final Abortable abortable; 065 066 @InterfaceAudience.Private 067 public Context( 068 final Configuration localConf, 069 final Configuration conf, 070 final FileSystem fs, 071 final String peerId, 072 final UUID clusterId, 073 final ReplicationPeer replicationPeer, 074 final MetricsSource metrics, 075 final TableDescriptors tableDescriptors, 076 final Abortable abortable) { 077 this.localConf = localConf; 078 this.conf = conf; 079 this.fs = fs; 080 this.clusterId = clusterId; 081 this.peerId = peerId; 082 this.replicationPeer = replicationPeer; 083 this.metrics = metrics; 084 this.tableDescriptors = tableDescriptors; 085 this.abortable = abortable; 086 } 087 public Configuration getConfiguration() { 088 return conf; 089 } 090 public Configuration getLocalConfiguration() { 091 return localConf; 092 } 093 public FileSystem getFilesystem() { 094 return fs; 095 } 096 public UUID getClusterId() { 097 return clusterId; 098 } 099 public String getPeerId() { 100 return peerId; 101 } 102 public ReplicationPeerConfig getPeerConfig() { 103 return replicationPeer.getPeerConfig(); 104 } 105 public ReplicationPeer getReplicationPeer() { 106 return replicationPeer; 107 } 108 public MetricsSource getMetrics() { 109 return metrics; 110 } 111 public TableDescriptors getTableDescriptors() { 112 return tableDescriptors; 113 } 114 public Abortable getAbortable() { return abortable; } 115 } 116 117 /** 118 * Initialize the replication endpoint with the given context. 119 * @param context replication context 120 * @throws IOException error occur when initialize the endpoint. 121 */ 122 void init(Context context) throws IOException; 123 124 /** Whether or not, the replication endpoint can replicate to it's source cluster with the same 125 * UUID */ 126 boolean canReplicateToSameCluster(); 127 128 /** 129 * Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted 130 * associated UUID. If the replication is not performed to an actual HBase cluster (but 131 * some other system), the UUID returned has to uniquely identify the connected target system. 132 * @return a UUID or null if the peer cluster does not exist or is not connected. 133 */ 134 UUID getPeerUUID(); 135 136 /** 137 * Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication 138 * infrastructure will call this filter before sending the edits to shipEdits(). 139 * @return a {@link WALEntryFilter} or null. 140 */ 141 WALEntryFilter getWALEntryfilter(); 142 143 /** 144 * A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method. 145 */ 146 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 147 static class ReplicateContext { 148 List<Entry> entries; 149 int size; 150 String walGroupId; 151 int timeout; 152 @InterfaceAudience.Private 153 public ReplicateContext() { 154 } 155 156 public ReplicateContext setEntries(List<Entry> entries) { 157 this.entries = entries; 158 return this; 159 } 160 public ReplicateContext setSize(int size) { 161 this.size = size; 162 return this; 163 } 164 public ReplicateContext setWalGroupId(String walGroupId) { 165 this.walGroupId = walGroupId; 166 return this; 167 } 168 public List<Entry> getEntries() { 169 return entries; 170 } 171 public int getSize() { 172 return size; 173 } 174 public String getWalGroupId(){ 175 return walGroupId; 176 } 177 public void setTimeout(int timeout) { 178 this.timeout = timeout; 179 } 180 public int getTimeout() { 181 return this.timeout; 182 } 183 } 184 185 /** 186 * Replicate the given set of entries (in the context) to the other cluster. 187 * Can block until all the given entries are replicated. Upon this method is returned, 188 * all entries that were passed in the context are assumed to be persisted in the 189 * target cluster. 190 * @param replicateContext a context where WAL entries and other 191 * parameters can be obtained. 192 */ 193 boolean replicate(ReplicateContext replicateContext); 194 195 196 // The below methods are inspired by Guava Service. See 197 // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service. 198 // Below we implement a subset only with different names on some methods so we can implement 199 // the below internally using Guava (without exposing our implementation to 200 // ReplicationEndpoint implementors. 201 202 /** 203 * Returns {@code true} if this service is RUNNING. 204 */ 205 boolean isRunning(); 206 207 /** 208 * @return Return {@code true} is this service is STARTING (but not yet RUNNING). 209 */ 210 boolean isStarting(); 211 212 /** 213 * Initiates service startup and returns immediately. A stopped service may not be restarted. 214 * Equivalent of startAsync call in Guava Service. 215 * @throws IllegalStateException if the service is not new, if it has been run already. 216 */ 217 void start(); 218 219 /** 220 * Waits for the {@link ReplicationEndpoint} to be up and running. 221 * 222 * @throws IllegalStateException if the service reaches a state from which it is not possible to 223 * enter the (internal) running state. e.g. if the state is terminated when this method is 224 * called then this will throw an IllegalStateException. 225 */ 226 void awaitRunning(); 227 228 /** 229 * Waits for the {@link ReplicationEndpoint} to to be up and running for no more 230 * than the given time. 231 * 232 * @param timeout the maximum time to wait 233 * @param unit the time unit of the timeout argument 234 * @throws TimeoutException if the service has not reached the given state within the deadline 235 * @throws IllegalStateException if the service reaches a state from which it is not possible to 236 * enter the (internal) running state. e.g. if the state is terminated when this method is 237 * called then this will throw an IllegalStateException. 238 */ 239 void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException; 240 241 /** 242 * If the service is starting or running, this initiates service shutdown and returns immediately. 243 * If the service has already been stopped, this method returns immediately without taking action. 244 * Equivalent of stopAsync call in Guava Service. 245 */ 246 void stop(); 247 248 /** 249 * Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state. 250 * 251 * @throws IllegalStateException if the service FAILED. 252 */ 253 void awaitTerminated(); 254 255 /** 256 * Waits for the {@link ReplicationEndpoint} to reach a terminal state for no 257 * more than the given time. 258 * 259 * @param timeout the maximum time to wait 260 * @param unit the time unit of the timeout argument 261 * @throws TimeoutException if the service has not reached the given state within the deadline 262 * @throws IllegalStateException if the service FAILED. 263 */ 264 void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException; 265 266 /** 267 * Returns the {@link Throwable} that caused this service to fail. 268 * 269 * @throws IllegalStateException if this service's state isn't FAILED. 270 */ 271 Throwable failureCause(); 272}