001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.UUID;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.TimeoutException;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.hadoop.hbase.HBaseInterfaceAudience;
029import org.apache.hadoop.hbase.TableDescriptors;
030import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
031import org.apache.hadoop.hbase.wal.WAL.Entry;
032import org.apache.yetus.audience.InterfaceAudience;
033
034/**
035 * ReplicationEndpoint is a plugin which implements replication to other HBase clusters, or other
036 * systems. ReplicationEndpoint implementation can be specified at the peer creation time by
037 * specifying it in the {@link ReplicationPeerConfig}. A ReplicationEndpoint is run in a thread in
038 * each region server in the same process.
039 * <p>
040 * ReplicationEndpoint is closely tied to ReplicationSource in a producer-consumer relation.
041 * ReplicationSource is an HBase-private class which tails the logs and manages the queue of logs
042 * plus management and persistence of all the state for replication. ReplicationEndpoint on the
043 * other hand is responsible for doing the actual shipping and persisting of the WAL entries in the
044 * other cluster.
045 */
046@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
047public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
048  // TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop.
049  // How they relate? Do we #start before #init(Context)? We fail fast if you don't?
050
051  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
052  class Context {
053    private final Configuration localConf;
054    private final Configuration conf;
055    private final FileSystem fs;
056    private final TableDescriptors tableDescriptors;
057    private final ReplicationPeer replicationPeer;
058    private final String peerId;
059    private final UUID clusterId;
060    private final MetricsSource metrics;
061    private final Abortable abortable;
062
063    @InterfaceAudience.Private
064    public Context(final Configuration localConf, final Configuration conf, final FileSystem fs,
065      final String peerId, final UUID clusterId, final ReplicationPeer replicationPeer,
066      final MetricsSource metrics, final TableDescriptors tableDescriptors,
067      final Abortable abortable) {
068      this.localConf = localConf;
069      this.conf = conf;
070      this.fs = fs;
071      this.clusterId = clusterId;
072      this.peerId = peerId;
073      this.replicationPeer = replicationPeer;
074      this.metrics = metrics;
075      this.tableDescriptors = tableDescriptors;
076      this.abortable = abortable;
077    }
078
079    public Configuration getConfiguration() {
080      return conf;
081    }
082
083    public Configuration getLocalConfiguration() {
084      return localConf;
085    }
086
087    public FileSystem getFilesystem() {
088      return fs;
089    }
090
091    public UUID getClusterId() {
092      return clusterId;
093    }
094
095    public String getPeerId() {
096      return peerId;
097    }
098
099    public ReplicationPeerConfig getPeerConfig() {
100      return replicationPeer.getPeerConfig();
101    }
102
103    public ReplicationPeer getReplicationPeer() {
104      return replicationPeer;
105    }
106
107    public MetricsSource getMetrics() {
108      return metrics;
109    }
110
111    public TableDescriptors getTableDescriptors() {
112      return tableDescriptors;
113    }
114
115    public Abortable getAbortable() {
116      return abortable;
117    }
118  }
119
120  /**
121   * Initialize the replication endpoint with the given context.
122   * @param context replication context
123   * @throws IOException error occur when initialize the endpoint.
124   */
125  void init(Context context) throws IOException;
126
127  /**
128   * Whether or not, the replication endpoint can replicate to it's source cluster with the same
129   * UUID
130   */
131  boolean canReplicateToSameCluster();
132
133  /**
134   * Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted associated
135   * UUID. If the replication is not performed to an actual HBase cluster (but some other system),
136   * the UUID returned has to uniquely identify the connected target system.
137   * @return a UUID or null if the peer cluster does not exist or is not connected.
138   */
139  UUID getPeerUUID();
140
141  /**
142   * Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication
143   * infrastructure will call this filter before sending the edits to shipEdits().
144   * @return a {@link WALEntryFilter} or null.
145   */
146  WALEntryFilter getWALEntryfilter();
147
148  /**
149   * A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method.
150   */
151  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
152  static class ReplicateContext {
153    List<Entry> entries;
154    int size;
155    String walGroupId;
156    int timeout;
157
158    @InterfaceAudience.Private
159    public ReplicateContext() {
160    }
161
162    public ReplicateContext setEntries(List<Entry> entries) {
163      this.entries = entries;
164      return this;
165    }
166
167    public ReplicateContext setSize(int size) {
168      this.size = size;
169      return this;
170    }
171
172    public ReplicateContext setWalGroupId(String walGroupId) {
173      this.walGroupId = walGroupId;
174      return this;
175    }
176
177    public List<Entry> getEntries() {
178      return entries;
179    }
180
181    public int getSize() {
182      return size;
183    }
184
185    public String getWalGroupId() {
186      return walGroupId;
187    }
188
189    public void setTimeout(int timeout) {
190      this.timeout = timeout;
191    }
192
193    public int getTimeout() {
194      return this.timeout;
195    }
196  }
197
198  /**
199   * Replicate the given set of entries (in the context) to the other cluster. Can block until all
200   * the given entries are replicated. Upon this method is returned, all entries that were passed in
201   * the context are assumed to be persisted in the target cluster.
202   * @param replicateContext a context where WAL entries and other parameters can be obtained.
203   */
204  boolean replicate(ReplicateContext replicateContext);
205
206  // The below methods are inspired by Guava Service. See
207  // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service.
208  // Below we implement a subset only with different names on some methods so we can implement
209  // the below internally using Guava (without exposing our implementation to
210  // ReplicationEndpoint implementors.
211
212  /**
213   * Returns {@code true} if this service is RUNNING.
214   */
215  boolean isRunning();
216
217  /** Returns Return {@code true} is this service is STARTING (but not yet RUNNING). */
218  boolean isStarting();
219
220  /**
221   * Initiates service startup and returns immediately. A stopped service may not be restarted.
222   * Equivalent of startAsync call in Guava Service.
223   * @throws IllegalStateException if the service is not new, if it has been run already.
224   */
225  void start();
226
227  /**
228   * Waits for the {@link ReplicationEndpoint} to be up and running.
229   * @throws IllegalStateException if the service reaches a state from which it is not possible to
230   *                               enter the (internal) running state. e.g. if the state is
231   *                               terminated when this method is called then this will throw an
232   *                               IllegalStateException.
233   */
234  void awaitRunning();
235
236  /**
237   * Waits for the {@link ReplicationEndpoint} to to be up and running for no more than the given
238   * time.
239   * @param timeout the maximum time to wait
240   * @param unit    the time unit of the timeout argument
241   * @throws TimeoutException      if the service has not reached the given state within the
242   *                               deadline
243   * @throws IllegalStateException if the service reaches a state from which it is not possible to
244   *                               enter the (internal) running state. e.g. if the state is
245   *                               terminated when this method is called then this will throw an
246   *                               IllegalStateException.
247   */
248  void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException;
249
250  /**
251   * If the service is starting or running, this initiates service shutdown and returns immediately.
252   * If the service has already been stopped, this method returns immediately without taking action.
253   * Equivalent of stopAsync call in Guava Service.
254   */
255  void stop();
256
257  /**
258   * Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state.
259   * @throws IllegalStateException if the service FAILED.
260   */
261  void awaitTerminated();
262
263  /**
264   * Waits for the {@link ReplicationEndpoint} to reach a terminal state for no more than the given
265   * time.
266   * @param timeout the maximum time to wait
267   * @param unit    the time unit of the timeout argument
268   * @throws TimeoutException      if the service has not reached the given state within the
269   *                               deadline
270   * @throws IllegalStateException if the service FAILED.
271   */
272  void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException;
273
274  /**
275   * Returns the {@link Throwable} that caused this service to fail.
276   * @throws IllegalStateException if this service's state isn't FAILED.
277   */
278  Throwable failureCause();
279}