001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
022
023import java.io.IOException;
024import java.net.ConnectException;
025import java.net.SocketTimeoutException;
026import java.net.UnknownHostException;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.concurrent.Callable;
033import java.util.concurrent.CompletionService;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.ExecutorCompletionService;
036import java.util.concurrent.Future;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.concurrent.ThreadPoolExecutor;
039import java.util.concurrent.TimeUnit;
040import java.util.regex.Matcher;
041import java.util.regex.Pattern;
042
043import org.apache.commons.lang3.StringUtils;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Abortable;
047import org.apache.hadoop.hbase.HBaseConfiguration;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.TableNotFoundException;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054import org.apache.hadoop.hbase.client.ClusterConnection;
055import org.apache.hadoop.hbase.client.Connection;
056import org.apache.hadoop.hbase.client.ConnectionFactory;
057import org.apache.hadoop.hbase.ipc.RpcServer;
058import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
060import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
061import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
062import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.FSUtils;
065import org.apache.hadoop.hbase.wal.WAL.Entry;
066import org.apache.hadoop.ipc.RemoteException;
067
068/**
069 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
070 * implementation for replicating to another HBase cluster.
071 * For the slave cluster it selects a random number of peers
072 * using a replication ratio. For example, if replication ration = 0.1
073 * and slave cluster has 100 region servers, 10 will be selected.
074 * <p>
075 * A stream is considered down when we cannot contact a region server on the
076 * peer cluster for more than 55 seconds by default.
077 * </p>
078 */
079@InterfaceAudience.Private
080public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
081  private static final Logger LOG =
082      LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
083
084  private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
085
086  private ClusterConnection conn;
087  private Configuration localConf;
088  private Configuration conf;
089  // How long should we sleep for each retry
090  private long sleepForRetries;
091  // Maximum number of retries before taking bold actions
092  private int maxRetriesMultiplier;
093  // Socket timeouts require even bolder actions since we don't want to DDOS
094  private int socketTimeoutMultiplier;
095  // Amount of time for shutdown to wait for all tasks to complete
096  private long maxTerminationWait;
097  // Size limit for replication RPCs, in bytes
098  private int replicationRpcLimit;
099  //Metrics for this source
100  private MetricsSource metrics;
101  // Handles connecting to peer region servers
102  private ReplicationSinkManager replicationSinkMgr;
103  private boolean peersSelected = false;
104  private String replicationClusterId = "";
105  private ThreadPoolExecutor exec;
106  private int maxThreads;
107  private Path baseNamespaceDir;
108  private Path hfileArchiveDir;
109  private boolean replicationBulkLoadDataEnabled;
110  private Abortable abortable;
111  private boolean dropOnDeletedTables;
112
113  @Override
114  public void init(Context context) throws IOException {
115    super.init(context);
116    this.conf = HBaseConfiguration.create(ctx.getConfiguration());
117    this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
118    decorateConf();
119    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
120    this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
121        maxRetriesMultiplier);
122    // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
123    // tasks to terminate when doStop() is called.
124    long maxTerminationWaitMultiplier = this.conf.getLong(
125        "replication.source.maxterminationmultiplier",
126        DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
127    this.maxTerminationWait = maxTerminationWaitMultiplier *
128        this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
129    // TODO: This connection is replication specific or we should make it particular to
130    // replication and make replication specific settings such as compression or codec to use
131    // passing Cells.
132    this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
133    this.sleepForRetries =
134        this.conf.getLong("replication.source.sleepforretries", 1000);
135    this.metrics = context.getMetrics();
136    // ReplicationQueueInfo parses the peerId out of the znode for us
137    this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
138    // per sink thread pool
139    this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
140      HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
141    this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
142        new LinkedBlockingQueue<>());
143    this.exec.allowCoreThreadTimeOut(true);
144    this.abortable = ctx.getAbortable();
145    // Set the size limit for replication RPCs to 95% of the max request size.
146    // We could do with less slop if we have an accurate estimate of encoded size. Being
147    // conservative for now.
148    this.replicationRpcLimit = (int)(0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE,
149      RpcServer.DEFAULT_MAX_REQUEST_SIZE));
150    this.dropOnDeletedTables =
151        this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
152
153    this.replicationBulkLoadDataEnabled =
154        conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
155          HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
156    if (this.replicationBulkLoadDataEnabled) {
157      replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
158    }
159    // Construct base namespace directory and hfile archive directory path
160    Path rootDir = FSUtils.getRootDir(conf);
161    Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
162    baseNamespaceDir = new Path(rootDir, baseNSDir);
163    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
164  }
165
166  private void decorateConf() {
167    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
168    if (StringUtils.isNotEmpty(replicationCodec)) {
169      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
170    }
171  }
172
173  private void connectToPeers() {
174    getRegionServers();
175
176    int sleepMultiplier = 1;
177
178    // Connect to peer cluster first, unless we have to stop
179    while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
180      replicationSinkMgr.chooseSinks();
181      if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) {
182        if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
183          sleepMultiplier++;
184        }
185      }
186    }
187  }
188
189  /**
190   * Do the sleeping logic
191   * @param msg Why we sleep
192   * @param sleepMultiplier by how many times the default sleeping time is augmented
193   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
194   */
195  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
196    try {
197      if (LOG.isTraceEnabled()) {
198        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
199      }
200      Thread.sleep(this.sleepForRetries * sleepMultiplier);
201    } catch (InterruptedException e) {
202      LOG.debug("Interrupted while sleeping between retries");
203    }
204    return sleepMultiplier < maxRetriesMultiplier;
205  }
206
207  private List<List<Entry>> createBatches(final List<Entry> entries) {
208    int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
209    int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
210    // Maintains the current batch for a given partition index
211    Map<Integer, List<Entry>> entryMap = new HashMap<>(n);
212    List<List<Entry>> entryLists = new ArrayList<>();
213    int[] sizes = new int[n];
214
215    for (int i = 0; i < n; i++) {
216      entryMap.put(i, new ArrayList<Entry>(entries.size()/n+1));
217    }
218
219    for (Entry e: entries) {
220      int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n);
221      int entrySize = (int)e.getKey().estimatedSerializedSizeOf() +
222          (int)e.getEdit().estimatedSerializedSizeOf();
223      // If this batch is oversized, add it to final list and initialize a new empty batch
224      if (sizes[index] > 0 /* must include at least one entry */ &&
225          sizes[index] + entrySize > replicationRpcLimit) {
226        entryLists.add(entryMap.get(index));
227        entryMap.put(index, new ArrayList<Entry>());
228        sizes[index] = 0;
229      }
230      entryMap.get(index).add(e);
231      sizes[index] += entrySize;
232    }
233
234    entryLists.addAll(entryMap.values());
235    return entryLists;
236  }
237
238  private TableName parseTable(String msg) {
239    // ... TableNotFoundException: '<table>'/n...
240    Pattern p = Pattern.compile("TableNotFoundException: \\'([\\S]*)\\'");
241    Matcher m = p.matcher(msg);
242    if (m.find()) {
243      String table = m.group(1);
244      try {
245        // double check that table is a valid table name
246        TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table)));
247        return TableName.valueOf(table);
248      } catch (IllegalArgumentException ignore) {
249      }
250    }
251    return null;
252  }
253
254  // Filter a set of batches by TableName
255  private List<List<Entry>> filterBatches(final List<List<Entry>> oldEntryList, TableName table) {
256    List<List<Entry>> entryLists = new ArrayList<>();
257    for (List<Entry> entries : oldEntryList) {
258      ArrayList<Entry> thisList = new ArrayList<Entry>(entries.size());
259      entryLists.add(thisList);
260      for (Entry e : entries) {
261        if (!e.getKey().getTableName().equals(table)) {
262          thisList.add(e);
263        }
264      }
265    }
266    return entryLists;
267  }
268
269  private void reconnectToPeerCluster() {
270    ClusterConnection connection = null;
271    try {
272      connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
273    } catch (IOException ioe) {
274      LOG.warn("Failed to create connection for peer cluster", ioe);
275    }
276    if (connection != null) {
277      this.conn = connection;
278    }
279  }
280
281  /**
282   * Do the shipping logic
283   */
284  @Override
285  public boolean replicate(ReplicateContext replicateContext) {
286    CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
287    List<List<Entry>> batches;
288    String walGroupId = replicateContext.getWalGroupId();
289    int sleepMultiplier = 1;
290
291    if (!peersSelected && this.isRunning()) {
292      connectToPeers();
293      peersSelected = true;
294    }
295
296    int numSinks = replicationSinkMgr.getNumSinks();
297    if (numSinks == 0) {
298      LOG.warn("No replication sinks found, returning without replicating. The source should retry"
299          + " with the same set of edits.");
300      return false;
301    }
302
303    batches = createBatches(replicateContext.getEntries());
304
305    while (this.isRunning() && !exec.isShutdown()) {
306      if (!isPeerEnabled()) {
307        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
308          sleepMultiplier++;
309        }
310        continue;
311      }
312      if (this.conn == null || this.conn.isClosed()) {
313        reconnectToPeerCluster();
314      }
315      try {
316        int futures = 0;
317        for (int i=0; i<batches.size(); i++) {
318          List<Entry> entries = batches.get(i);
319          if (!entries.isEmpty()) {
320            if (LOG.isTraceEnabled()) {
321              LOG.trace("Submitting " + entries.size() +
322                  " entries of total size " + replicateContext.getSize());
323            }
324            // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
325            pool.submit(createReplicator(entries, i));
326            futures++;
327          }
328        }
329        IOException iox = null;
330
331        long lastWriteTime = 0;
332        for (int i=0; i<futures; i++) {
333          try {
334            // wait for all futures, remove successful parts
335            // (only the remaining parts will be retried)
336            Future<Integer> f = pool.take();
337            int index = f.get().intValue();
338            List<Entry> batch = batches.get(index);
339            batches.set(index, Collections.<Entry>emptyList()); // remove successful batch
340            // Find the most recent write time in the batch
341            long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
342            if (writeTime > lastWriteTime) {
343              lastWriteTime = writeTime;
344            }
345          } catch (InterruptedException ie) {
346            iox =  new IOException(ie);
347          } catch (ExecutionException ee) {
348            // cause must be an IOException
349            iox = (IOException)ee.getCause();
350          }
351        }
352        if (iox != null) {
353          // if we had any exceptions, try again
354          throw iox;
355        }
356        // update metrics
357        if (lastWriteTime > 0) {
358          this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
359        }
360        return true;
361
362      } catch (IOException ioe) {
363        // Didn't ship anything, but must still age the last time we did
364        this.metrics.refreshAgeOfLastShippedOp(walGroupId);
365        if (ioe instanceof RemoteException) {
366          ioe = ((RemoteException) ioe).unwrapRemoteException();
367          LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
368          if (ioe instanceof TableNotFoundException) {
369            if (dropOnDeletedTables) {
370              // this is a bit fragile, but cannot change how TNFE is serialized
371              // at least check whether the table name is legal
372              TableName table = parseTable(ioe.getMessage());
373              if (table != null) {
374                try (Connection localConn =
375                    ConnectionFactory.createConnection(ctx.getLocalConfiguration())) {
376                  if (!localConn.getAdmin().tableExists(table)) {
377                    // Would potentially be better to retry in one of the outer loops
378                    // and add a table filter there; but that would break the encapsulation,
379                    // so we're doing the filtering here.
380                    LOG.info("Missing table detected at sink, local table also does not exist, filtering edits for '"+table+"'");
381                    batches = filterBatches(batches, table);
382                    continue;
383                  }
384                } catch (IOException iox) {
385                  LOG.warn("Exception checking for local table: ", iox);
386                }
387              }
388            }
389            // fall through and sleep below
390          } else {
391            LOG.warn("Peer encountered RemoteException, rechecking all sinks: ", ioe);
392            replicationSinkMgr.chooseSinks();
393          }
394        } else {
395          if (ioe instanceof SocketTimeoutException) {
396            // This exception means we waited for more than 60s and nothing
397            // happened, the cluster is alive and calling it right away
398            // even for a test just makes things worse.
399            sleepForRetries("Encountered a SocketTimeoutException. Since the " +
400              "call to the remote cluster timed out, which is usually " +
401              "caused by a machine failure or a massive slowdown",
402              this.socketTimeoutMultiplier);
403          } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
404            LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
405            replicationSinkMgr.chooseSinks();
406          } else {
407            LOG.warn("Can't replicate because of a local or network error: ", ioe);
408          }
409        }
410        if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
411          sleepMultiplier++;
412        }
413      }
414    }
415    return false; // in case we exited before replicating
416  }
417
418  protected boolean isPeerEnabled() {
419    return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
420  }
421
422  @Override
423  protected void doStop() {
424    disconnect(); //don't call super.doStop()
425    if (this.conn != null) {
426      try {
427        this.conn.close();
428        this.conn = null;
429      } catch (IOException e) {
430        LOG.warn("Failed to close the connection");
431      }
432    }
433    // Allow currently running replication tasks to finish
434    exec.shutdown();
435    try {
436      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
437    } catch (InterruptedException e) {
438    }
439    // Abort if the tasks did not terminate in time
440    if (!exec.isTerminated()) {
441      String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " +
442          "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " +
443          "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
444      abortable.abort(errMsg, new IOException(errMsg));
445    }
446    notifyStopped();
447  }
448
449  @VisibleForTesting
450  protected Replicator createReplicator(List<Entry> entries, int ordinal) {
451    return new Replicator(entries, ordinal);
452  }
453
454  @VisibleForTesting
455  protected class Replicator implements Callable<Integer> {
456    private List<Entry> entries;
457    private int ordinal;
458    public Replicator(List<Entry> entries, int ordinal) {
459      this.entries = entries;
460      this.ordinal = ordinal;
461    }
462
463    protected void replicateEntries(BlockingInterface rrs, final List<Entry> batch,
464        String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
465        throws IOException {
466      if (LOG.isTraceEnabled()) {
467        long size = 0;
468        for (Entry e: entries) {
469          size += e.getKey().estimatedSerializedSizeOf();
470          size += e.getEdit().estimatedSerializedSizeOf();
471        }
472        LOG.trace("Replicating batch " + System.identityHashCode(entries) + " of " +
473            entries.size() + " entries with total size " + size + " bytes to " +
474            replicationClusterId);
475      }
476      try {
477        ReplicationProtbufUtil.replicateWALEntry(rrs, batch.toArray(new Entry[batch.size()]),
478          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
479        if (LOG.isTraceEnabled()) {
480          LOG.trace("Completed replicating batch " + System.identityHashCode(entries));
481        }
482      } catch (IOException e) {
483        if (LOG.isTraceEnabled()) {
484          LOG.trace("Failed replicating batch " + System.identityHashCode(entries), e);
485        }
486        throw e;
487      }
488    }
489
490    @Override
491    public Integer call() throws IOException {
492      SinkPeer sinkPeer = null;
493      try {
494        sinkPeer = replicationSinkMgr.getReplicationSink();
495        BlockingInterface rrs = sinkPeer.getRegionServer();
496        replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir);
497        replicationSinkMgr.reportSinkSuccess(sinkPeer);
498        return ordinal;
499      } catch (IOException ioe) {
500        if (sinkPeer != null) {
501          replicationSinkMgr.reportBadSink(sinkPeer);
502        }
503        throw ioe;
504      }
505    }
506  }
507}