001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.TimeoutException;
026
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.TableDescriptors;
033import org.apache.hadoop.hbase.wal.WAL.Entry;
034import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
035
036/**
037 * ReplicationEndpoint is a plugin which implements replication
038 * to other HBase clusters, or other systems. ReplicationEndpoint implementation
039 * can be specified at the peer creation time by specifying it
040 * in the {@link ReplicationPeerConfig}. A ReplicationEndpoint is run in a thread
041 * in each region server in the same process.
042 * <p>
043 * ReplicationEndpoint is closely tied to ReplicationSource in a producer-consumer
044 * relation. ReplicationSource is an HBase-private class which tails the logs and manages
045 * the queue of logs plus management and persistence of all the state for replication.
046 * ReplicationEndpoint on the other hand is responsible for doing the actual shipping
047 * and persisting of the WAL entries in the other cluster.
048 */
049@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
050public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
051  // TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop.
052  // How they relate? Do we #start before #init(Context)? We fail fast if you don't?
053
054  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
055  class Context {
056    private final Configuration localConf;
057    private final Configuration conf;
058    private final FileSystem fs;
059    private final TableDescriptors tableDescriptors;
060    private final ReplicationPeer replicationPeer;
061    private final String peerId;
062    private final UUID clusterId;
063    private final MetricsSource metrics;
064    private final Abortable abortable;
065
066    @InterfaceAudience.Private
067    public Context(
068        final Configuration localConf,
069        final Configuration conf,
070        final FileSystem fs,
071        final String peerId,
072        final UUID clusterId,
073        final ReplicationPeer replicationPeer,
074        final MetricsSource metrics,
075        final TableDescriptors tableDescriptors,
076        final Abortable abortable) {
077      this.localConf = localConf;
078      this.conf = conf;
079      this.fs = fs;
080      this.clusterId = clusterId;
081      this.peerId = peerId;
082      this.replicationPeer = replicationPeer;
083      this.metrics = metrics;
084      this.tableDescriptors = tableDescriptors;
085      this.abortable = abortable;
086    }
087    public Configuration getConfiguration() {
088      return conf;
089    }
090    public Configuration getLocalConfiguration() {
091      return localConf;
092    }
093    public FileSystem getFilesystem() {
094      return fs;
095    }
096    public UUID getClusterId() {
097      return clusterId;
098    }
099    public String getPeerId() {
100      return peerId;
101    }
102    public ReplicationPeerConfig getPeerConfig() {
103      return replicationPeer.getPeerConfig();
104    }
105    public ReplicationPeer getReplicationPeer() {
106      return replicationPeer;
107    }
108    public MetricsSource getMetrics() {
109      return metrics;
110    }
111    public TableDescriptors getTableDescriptors() {
112      return tableDescriptors;
113    }
114    public Abortable getAbortable() { return abortable; }
115  }
116
117  /**
118   * Initialize the replication endpoint with the given context.
119   * @param context replication context
120   * @throws IOException error occur when initialize the endpoint.
121   */
122  void init(Context context) throws IOException;
123
124  /** Whether or not, the replication endpoint can replicate to it's source cluster with the same
125   * UUID */
126  boolean canReplicateToSameCluster();
127
128  /**
129   * Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted
130   * associated UUID. If the replication is not performed to an actual HBase cluster (but
131   * some other system), the UUID returned has to uniquely identify the connected target system.
132   * @return a UUID or null if the peer cluster does not exist or is not connected.
133   */
134  UUID getPeerUUID();
135
136  /**
137   * Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication
138   * infrastructure will call this filter before sending the edits to shipEdits().
139   * @return a {@link WALEntryFilter} or null.
140   */
141  WALEntryFilter getWALEntryfilter();
142
143  /**
144   * A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method.
145   */
146  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
147  static class ReplicateContext {
148    List<Entry> entries;
149    int size;
150    String walGroupId;
151    int timeout;
152    @InterfaceAudience.Private
153    public ReplicateContext() {
154    }
155
156    public ReplicateContext setEntries(List<Entry> entries) {
157      this.entries = entries;
158      return this;
159    }
160    public ReplicateContext setSize(int size) {
161      this.size = size;
162      return this;
163    }
164    public ReplicateContext setWalGroupId(String walGroupId) {
165      this.walGroupId = walGroupId;
166      return this;
167    }
168    public List<Entry> getEntries() {
169      return entries;
170    }
171    public int getSize() {
172      return size;
173    }
174    public String getWalGroupId(){
175      return walGroupId;
176    }
177    public void setTimeout(int timeout) {
178      this.timeout = timeout;
179    }
180    public int getTimeout() {
181      return this.timeout;
182    }
183  }
184
185  /**
186   * Replicate the given set of entries (in the context) to the other cluster.
187   * Can block until all the given entries are replicated. Upon this method is returned,
188   * all entries that were passed in the context are assumed to be persisted in the
189   * target cluster.
190   * @param replicateContext a context where WAL entries and other
191   * parameters can be obtained.
192   */
193  boolean replicate(ReplicateContext replicateContext);
194
195
196  // The below methods are inspired by Guava Service. See
197  // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service.
198  // Below we implement a subset only with different names on some methods so we can implement
199  // the below internally using Guava (without exposing our implementation to
200  // ReplicationEndpoint implementors.
201
202  /**
203   * Returns {@code true} if this service is RUNNING.
204   */
205  boolean isRunning();
206
207  /**
208   * @return Return {@code true} is this service is STARTING (but not yet RUNNING).
209   */
210  boolean isStarting();
211
212  /**
213   * Initiates service startup and returns immediately. A stopped service may not be restarted.
214   * Equivalent of startAsync call in Guava Service.
215   * @throws IllegalStateException if the service is not new, if it has been run already.
216   */
217  void start();
218
219  /**
220   * Waits for the {@link ReplicationEndpoint} to be up and running.
221   *
222   * @throws IllegalStateException if the service reaches a state from which it is not possible to
223   *     enter the (internal) running state. e.g. if the state is terminated when this method is
224   *     called then this will throw an IllegalStateException.
225   */
226  void awaitRunning();
227
228  /**
229   * Waits for the {@link ReplicationEndpoint} to to be up and running for no more
230   * than the given time.
231   *
232   * @param timeout the maximum time to wait
233   * @param unit the time unit of the timeout argument
234   * @throws TimeoutException if the service has not reached the given state within the deadline
235   * @throws IllegalStateException if the service reaches a state from which it is not possible to
236   *     enter the (internal) running state. e.g. if the state is terminated when this method is
237   *     called then this will throw an IllegalStateException.
238   */
239  void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException;
240
241  /**
242   * If the service is starting or running, this initiates service shutdown and returns immediately.
243   * If the service has already been stopped, this method returns immediately without taking action.
244   * Equivalent of stopAsync call in Guava Service.
245   */
246  void stop();
247
248  /**
249   * Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state.
250   *
251   * @throws IllegalStateException if the service FAILED.
252   */
253  void awaitTerminated();
254
255  /**
256   * Waits for the {@link ReplicationEndpoint} to reach a terminal state for no
257   * more than the given time.
258   *
259   * @param timeout the maximum time to wait
260   * @param unit the time unit of the timeout argument
261   * @throws TimeoutException if the service has not reached the given state within the deadline
262   * @throws IllegalStateException if the service FAILED.
263   */
264  void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException;
265
266  /**
267   * Returns the {@link Throwable} that caused this service to fail.
268   *
269   * @throws IllegalStateException if this service's state isn't FAILED.
270   */
271  Throwable failureCause();
272}