001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.UUID; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.TimeoutException; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.hadoop.hbase.HBaseInterfaceAudience; 029import org.apache.hadoop.hbase.Server; 030import org.apache.hadoop.hbase.TableDescriptors; 031import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; 032import org.apache.hadoop.hbase.wal.WAL.Entry; 033import org.apache.yetus.audience.InterfaceAudience; 034 035/** 036 * ReplicationEndpoint is a plugin which implements replication to other HBase clusters, or other 037 * systems. ReplicationEndpoint implementation can be specified at the peer creation time by 038 * specifying it in the {@link ReplicationPeerConfig}. A ReplicationEndpoint is run in a thread in 039 * each region server in the same process. 040 * <p> 041 * ReplicationEndpoint is closely tied to ReplicationSource in a producer-consumer relation. 042 * ReplicationSource is an HBase-private class which tails the logs and manages the queue of logs 043 * plus management and persistence of all the state for replication. ReplicationEndpoint on the 044 * other hand is responsible for doing the actual shipping and persisting of the WAL entries in the 045 * other cluster. 046 */ 047@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 048public interface ReplicationEndpoint extends ReplicationPeerConfigListener { 049 // TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop. 050 // How they relate? Do we #start before #init(Context)? We fail fast if you don't? 051 052 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 053 class Context { 054 private final Server server; 055 private final Configuration localConf; 056 private final Configuration conf; 057 private final FileSystem fs; 058 private final TableDescriptors tableDescriptors; 059 private final ReplicationPeer replicationPeer; 060 private final String peerId; 061 private final UUID clusterId; 062 private final MetricsSource metrics; 063 private final Abortable abortable; 064 065 @InterfaceAudience.Private 066 public Context(final Server server, final Configuration localConf, final Configuration conf, 067 final FileSystem fs, final String peerId, final UUID clusterId, 068 final ReplicationPeer replicationPeer, final MetricsSource metrics, 069 final TableDescriptors tableDescriptors, final Abortable abortable) { 070 this.server = server; 071 this.localConf = localConf; 072 this.conf = conf; 073 this.fs = fs; 074 this.clusterId = clusterId; 075 this.peerId = peerId; 076 this.replicationPeer = replicationPeer; 077 this.metrics = metrics; 078 this.tableDescriptors = tableDescriptors; 079 this.abortable = abortable; 080 } 081 082 public Server getServer() { 083 return server; 084 } 085 086 public Configuration getConfiguration() { 087 return conf; 088 } 089 090 public Configuration getLocalConfiguration() { 091 return localConf; 092 } 093 094 public FileSystem getFilesystem() { 095 return fs; 096 } 097 098 public UUID getClusterId() { 099 return clusterId; 100 } 101 102 public String getPeerId() { 103 return peerId; 104 } 105 106 public ReplicationPeerConfig getPeerConfig() { 107 return replicationPeer.getPeerConfig(); 108 } 109 110 public ReplicationPeer getReplicationPeer() { 111 return replicationPeer; 112 } 113 114 public MetricsSource getMetrics() { 115 return metrics; 116 } 117 118 public TableDescriptors getTableDescriptors() { 119 return tableDescriptors; 120 } 121 122 public Abortable getAbortable() { 123 return abortable; 124 } 125 } 126 127 /** 128 * Initialize the replication endpoint with the given context. 129 * @param context replication context 130 * @throws IOException error occur when initialize the endpoint. 131 */ 132 void init(Context context) throws IOException; 133 134 /** 135 * Whether or not, the replication endpoint can replicate to it's source cluster with the same 136 * UUID 137 */ 138 boolean canReplicateToSameCluster(); 139 140 /** 141 * Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted associated 142 * UUID. If the replication is not performed to an actual HBase cluster (but some other system), 143 * the UUID returned has to uniquely identify the connected target system. 144 * @return a UUID or null if the peer cluster does not exist or is not connected. 145 */ 146 UUID getPeerUUID(); 147 148 /** 149 * Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication 150 * infrastructure will call this filter before sending the edits to shipEdits(). 151 * @return a {@link WALEntryFilter} or null. 152 */ 153 WALEntryFilter getWALEntryfilter(); 154 155 /** 156 * A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method. 157 */ 158 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 159 static class ReplicateContext { 160 List<Entry> entries; 161 int size; 162 String walGroupId; 163 int timeout; 164 165 @InterfaceAudience.Private 166 public ReplicateContext() { 167 } 168 169 public ReplicateContext setEntries(List<Entry> entries) { 170 this.entries = entries; 171 return this; 172 } 173 174 public ReplicateContext setSize(int size) { 175 this.size = size; 176 return this; 177 } 178 179 public ReplicateContext setWalGroupId(String walGroupId) { 180 this.walGroupId = walGroupId; 181 return this; 182 } 183 184 public List<Entry> getEntries() { 185 return entries; 186 } 187 188 public int getSize() { 189 return size; 190 } 191 192 public String getWalGroupId() { 193 return walGroupId; 194 } 195 196 public void setTimeout(int timeout) { 197 this.timeout = timeout; 198 } 199 200 public int getTimeout() { 201 return this.timeout; 202 } 203 } 204 205 /** 206 * Replicate the given set of entries (in the context) to the other cluster. Can block until all 207 * the given entries are replicated. Upon this method is returned, all entries that were passed in 208 * the context are assumed to be persisted in the target cluster. 209 * @param replicateContext a context where WAL entries and other parameters can be obtained. 210 */ 211 boolean replicate(ReplicateContext replicateContext); 212 213 // The below methods are inspired by Guava Service. See 214 // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service. 215 // Below we implement a subset only with different names on some methods so we can implement 216 // the below internally using Guava (without exposing our implementation to 217 // ReplicationEndpoint implementors. 218 219 /** 220 * Returns {@code true} if this service is RUNNING. 221 */ 222 boolean isRunning(); 223 224 /** Returns Return {@code true} is this service is STARTING (but not yet RUNNING). */ 225 boolean isStarting(); 226 227 /** 228 * Initiates service startup and returns immediately. A stopped service may not be restarted. 229 * Equivalent of startAsync call in Guava Service. 230 * @throws IllegalStateException if the service is not new, if it has been run already. 231 */ 232 void start(); 233 234 /** 235 * Waits for the {@link ReplicationEndpoint} to be up and running. 236 * @throws IllegalStateException if the service reaches a state from which it is not possible to 237 * enter the (internal) running state. e.g. if the state is 238 * terminated when this method is called then this will throw an 239 * IllegalStateException. 240 */ 241 void awaitRunning(); 242 243 /** 244 * Waits for the {@link ReplicationEndpoint} to to be up and running for no more than the given 245 * time. 246 * @param timeout the maximum time to wait 247 * @param unit the time unit of the timeout argument 248 * @throws TimeoutException if the service has not reached the given state within the 249 * deadline 250 * @throws IllegalStateException if the service reaches a state from which it is not possible to 251 * enter the (internal) running state. e.g. if the state is 252 * terminated when this method is called then this will throw an 253 * IllegalStateException. 254 */ 255 void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException; 256 257 /** 258 * If the service is starting or running, this initiates service shutdown and returns immediately. 259 * If the service has already been stopped, this method returns immediately without taking action. 260 * Equivalent of stopAsync call in Guava Service. 261 */ 262 void stop(); 263 264 /** 265 * Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state. 266 * @throws IllegalStateException if the service FAILED. 267 */ 268 void awaitTerminated(); 269 270 /** 271 * Waits for the {@link ReplicationEndpoint} to reach a terminal state for no more than the given 272 * time. 273 * @param timeout the maximum time to wait 274 * @param unit the time unit of the timeout argument 275 * @throws TimeoutException if the service has not reached the given state within the 276 * deadline 277 * @throws IllegalStateException if the service FAILED. 278 */ 279 void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException; 280 281 /** 282 * Returns the {@link Throwable} that caused this service to fail. 283 * @throws IllegalStateException if this service's state isn't FAILED. 284 */ 285 Throwable failureCause(); 286}