001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.UUID; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.TimeoutException; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.hadoop.hbase.HBaseInterfaceAudience; 029import org.apache.hadoop.hbase.TableDescriptors; 030import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; 031import org.apache.hadoop.hbase.wal.WAL.Entry; 032import org.apache.yetus.audience.InterfaceAudience; 033 034/** 035 * ReplicationEndpoint is a plugin which implements replication to other HBase clusters, or other 036 * systems. ReplicationEndpoint implementation can be specified at the peer creation time by 037 * specifying it in the {@link ReplicationPeerConfig}. A ReplicationEndpoint is run in a thread in 038 * each region server in the same process. 039 * <p> 040 * ReplicationEndpoint is closely tied to ReplicationSource in a producer-consumer relation. 041 * ReplicationSource is an HBase-private class which tails the logs and manages the queue of logs 042 * plus management and persistence of all the state for replication. ReplicationEndpoint on the 043 * other hand is responsible for doing the actual shipping and persisting of the WAL entries in the 044 * other cluster. 045 */ 046@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 047public interface ReplicationEndpoint extends ReplicationPeerConfigListener { 048 // TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop. 049 // How they relate? Do we #start before #init(Context)? We fail fast if you don't? 050 051 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 052 class Context { 053 private final Configuration localConf; 054 private final Configuration conf; 055 private final FileSystem fs; 056 private final TableDescriptors tableDescriptors; 057 private final ReplicationPeer replicationPeer; 058 private final String peerId; 059 private final UUID clusterId; 060 private final MetricsSource metrics; 061 private final Abortable abortable; 062 063 @InterfaceAudience.Private 064 public Context(final Configuration localConf, final Configuration conf, final FileSystem fs, 065 final String peerId, final UUID clusterId, final ReplicationPeer replicationPeer, 066 final MetricsSource metrics, final TableDescriptors tableDescriptors, 067 final Abortable abortable) { 068 this.localConf = localConf; 069 this.conf = conf; 070 this.fs = fs; 071 this.clusterId = clusterId; 072 this.peerId = peerId; 073 this.replicationPeer = replicationPeer; 074 this.metrics = metrics; 075 this.tableDescriptors = tableDescriptors; 076 this.abortable = abortable; 077 } 078 079 public Configuration getConfiguration() { 080 return conf; 081 } 082 083 public Configuration getLocalConfiguration() { 084 return localConf; 085 } 086 087 public FileSystem getFilesystem() { 088 return fs; 089 } 090 091 public UUID getClusterId() { 092 return clusterId; 093 } 094 095 public String getPeerId() { 096 return peerId; 097 } 098 099 public ReplicationPeerConfig getPeerConfig() { 100 return replicationPeer.getPeerConfig(); 101 } 102 103 public ReplicationPeer getReplicationPeer() { 104 return replicationPeer; 105 } 106 107 public MetricsSource getMetrics() { 108 return metrics; 109 } 110 111 public TableDescriptors getTableDescriptors() { 112 return tableDescriptors; 113 } 114 115 public Abortable getAbortable() { 116 return abortable; 117 } 118 } 119 120 /** 121 * Initialize the replication endpoint with the given context. 122 * @param context replication context 123 * @throws IOException error occur when initialize the endpoint. 124 */ 125 void init(Context context) throws IOException; 126 127 /** 128 * Whether or not, the replication endpoint can replicate to it's source cluster with the same 129 * UUID 130 */ 131 boolean canReplicateToSameCluster(); 132 133 /** 134 * Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted associated 135 * UUID. If the replication is not performed to an actual HBase cluster (but some other system), 136 * the UUID returned has to uniquely identify the connected target system. 137 * @return a UUID or null if the peer cluster does not exist or is not connected. 138 */ 139 UUID getPeerUUID(); 140 141 /** 142 * Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication 143 * infrastructure will call this filter before sending the edits to shipEdits(). 144 * @return a {@link WALEntryFilter} or null. 145 */ 146 WALEntryFilter getWALEntryfilter(); 147 148 /** 149 * A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method. 150 */ 151 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) 152 static class ReplicateContext { 153 List<Entry> entries; 154 int size; 155 String walGroupId; 156 int timeout; 157 158 @InterfaceAudience.Private 159 public ReplicateContext() { 160 } 161 162 public ReplicateContext setEntries(List<Entry> entries) { 163 this.entries = entries; 164 return this; 165 } 166 167 public ReplicateContext setSize(int size) { 168 this.size = size; 169 return this; 170 } 171 172 public ReplicateContext setWalGroupId(String walGroupId) { 173 this.walGroupId = walGroupId; 174 return this; 175 } 176 177 public List<Entry> getEntries() { 178 return entries; 179 } 180 181 public int getSize() { 182 return size; 183 } 184 185 public String getWalGroupId() { 186 return walGroupId; 187 } 188 189 public void setTimeout(int timeout) { 190 this.timeout = timeout; 191 } 192 193 public int getTimeout() { 194 return this.timeout; 195 } 196 } 197 198 /** 199 * Replicate the given set of entries (in the context) to the other cluster. Can block until all 200 * the given entries are replicated. Upon this method is returned, all entries that were passed in 201 * the context are assumed to be persisted in the target cluster. 202 * @param replicateContext a context where WAL entries and other parameters can be obtained. 203 */ 204 boolean replicate(ReplicateContext replicateContext); 205 206 // The below methods are inspired by Guava Service. See 207 // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service. 208 // Below we implement a subset only with different names on some methods so we can implement 209 // the below internally using Guava (without exposing our implementation to 210 // ReplicationEndpoint implementors. 211 212 /** 213 * Returns {@code true} if this service is RUNNING. 214 */ 215 boolean isRunning(); 216 217 /** Returns Return {@code true} is this service is STARTING (but not yet RUNNING). */ 218 boolean isStarting(); 219 220 /** 221 * Initiates service startup and returns immediately. A stopped service may not be restarted. 222 * Equivalent of startAsync call in Guava Service. 223 * @throws IllegalStateException if the service is not new, if it has been run already. 224 */ 225 void start(); 226 227 /** 228 * Waits for the {@link ReplicationEndpoint} to be up and running. 229 * @throws IllegalStateException if the service reaches a state from which it is not possible to 230 * enter the (internal) running state. e.g. if the state is 231 * terminated when this method is called then this will throw an 232 * IllegalStateException. 233 */ 234 void awaitRunning(); 235 236 /** 237 * Waits for the {@link ReplicationEndpoint} to to be up and running for no more than the given 238 * time. 239 * @param timeout the maximum time to wait 240 * @param unit the time unit of the timeout argument 241 * @throws TimeoutException if the service has not reached the given state within the 242 * deadline 243 * @throws IllegalStateException if the service reaches a state from which it is not possible to 244 * enter the (internal) running state. e.g. if the state is 245 * terminated when this method is called then this will throw an 246 * IllegalStateException. 247 */ 248 void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException; 249 250 /** 251 * If the service is starting or running, this initiates service shutdown and returns immediately. 252 * If the service has already been stopped, this method returns immediately without taking action. 253 * Equivalent of stopAsync call in Guava Service. 254 */ 255 void stop(); 256 257 /** 258 * Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state. 259 * @throws IllegalStateException if the service FAILED. 260 */ 261 void awaitTerminated(); 262 263 /** 264 * Waits for the {@link ReplicationEndpoint} to reach a terminal state for no more than the given 265 * time. 266 * @param timeout the maximum time to wait 267 * @param unit the time unit of the timeout argument 268 * @throws TimeoutException if the service has not reached the given state within the 269 * deadline 270 * @throws IllegalStateException if the service FAILED. 271 */ 272 void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException; 273 274 /** 275 * Returns the {@link Throwable} that caused this service to fail. 276 * @throws IllegalStateException if this service's state isn't FAILED. 277 */ 278 Throwable failureCause(); 279}