001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.TreeMap;
029import java.util.UUID;
030import java.util.concurrent.Future;
031import java.util.concurrent.atomic.AtomicLong;
032import java.util.stream.Collectors;
033import org.apache.commons.lang3.StringUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellScanner;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.TableNotFoundException;
043import org.apache.hadoop.hbase.client.AsyncConnection;
044import org.apache.hadoop.hbase.client.AsyncTable;
045import org.apache.hadoop.hbase.client.Connection;
046import org.apache.hadoop.hbase.client.ConnectionFactory;
047import org.apache.hadoop.hbase.client.Delete;
048import org.apache.hadoop.hbase.client.Mutation;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.client.RetriesExhaustedException;
051import org.apache.hadoop.hbase.client.Row;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.FutureUtils;
054import org.apache.hadoop.hbase.util.Pair;
055import org.apache.hadoop.hbase.wal.WALEdit;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
061
062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
066
067/**
068 * <p>
069 * This class is responsible for replicating the edits coming from another cluster.
070 * </p>
071 * <p>
072 * This replication process is currently waiting for the edits to be applied before the method can
073 * return. This means that the replication of edits is synchronized (after reading from WALs in
074 * ReplicationSource) and that a single region server cannot receive edits from two sources at the
075 * same time
076 * </p>
077 * <p>
078 * This class uses the native HBase client in order to replicate entries.
079 * </p>
080 * TODO make this class more like ReplicationSource wrt log handling
081 */
082@InterfaceAudience.Private
083public class ReplicationSink {
084
085  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class);
086  private final Configuration conf;
087  // Volatile because of note in here -- look for double-checked locking:
088  // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
089  /**
090   * This shared {@link Connection} is used for handling bulk load hfiles replication.
091   */
092  private volatile Connection sharedConnection;
093  /**
094   * This shared {@link AsyncConnection} is used for handling wal replication.
095   */
096  private volatile AsyncConnection sharedAsyncConnection;
097  private final MetricsSink metrics;
098  private final AtomicLong totalReplicatedEdits = new AtomicLong();
099  private final Object sharedConnectionLock = new Object();
100  private final Object sharedAsyncConnectionLock = new Object();
101  // Number of hfiles that we successfully replicated
102  private long hfilesReplicated = 0;
103  private SourceFSConfigurationProvider provider;
104  private WALEntrySinkFilter walEntrySinkFilter;
105
106  /**
107   * Row size threshold for multi requests above which a warning is logged
108   */
109  private final int rowSizeWarnThreshold;
110
111  /**
112   * Create a sink for replication
113   * @param conf conf object
114   * @throws IOException thrown when HDFS goes bad or bad file name
115   */
116  public ReplicationSink(Configuration conf) throws IOException {
117    this.conf = HBaseConfiguration.create(conf);
118    rowSizeWarnThreshold =
119      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
120    decorateConf();
121    this.metrics = new MetricsSink();
122    this.walEntrySinkFilter = setupWALEntrySinkFilter();
123    String className = conf.get("hbase.replication.source.fs.conf.provider",
124      DefaultSourceFSConfigurationProvider.class.getCanonicalName());
125    try {
126      @SuppressWarnings("rawtypes")
127      Class c = Class.forName(className);
128      this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance();
129    } catch (Exception e) {
130      throw new IllegalArgumentException(
131        "Configured source fs configuration provider class " + className + " throws error.", e);
132    }
133  }
134
135  private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException {
136    Class<?> walEntryFilterClass =
137      this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null);
138    WALEntrySinkFilter filter = null;
139    try {
140      filter = walEntryFilterClass == null
141        ? null
142        : (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance();
143    } catch (Exception e) {
144      LOG.warn("Failed to instantiate " + walEntryFilterClass);
145    }
146    if (filter != null) {
147      filter.init(getConnection());
148    }
149    return filter;
150  }
151
152  /**
153   * decorate the Configuration object to make replication more receptive to delays: lessen the
154   * timeout and numTries.
155   */
156  private void decorateConf() {
157    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
158      this.conf.getInt("replication.sink.client.retries.number", 4));
159    this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
160      this.conf.getInt("replication.sink.client.ops.timeout", 10000));
161    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
162    if (StringUtils.isNotEmpty(replicationCodec)) {
163      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
164    }
165    // use server ZK cluster for replication, so we unset the client ZK related properties if any
166    if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
167      this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
168    }
169  }
170
171  /**
172   * Replicate this array of entries directly into the local cluster using the native client. Only
173   * operates against raw protobuf type saving on a conversion from pb to pojo.
174   * @param replicationClusterId       Id which will uniquely identify source cluster FS client
175   *                                   configurations in the replication configuration directory
176   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
177   *                                   directory
178   * @param sourceHFileArchiveDirPath  Path that point to the source cluster hfile archive directory
179   * @throws IOException If failed to replicate the data
180   */
181  public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
182    String replicationClusterId, String sourceBaseNamespaceDirPath,
183    String sourceHFileArchiveDirPath) throws IOException {
184    if (entries.isEmpty()) return;
185    // Very simple optimization where we batch sequences of rows going
186    // to the same table.
187    try {
188      long totalReplicated = 0;
189      // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
190      // invocation of this method per table and cluster id.
191      Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
192
193      Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
194      for (WALEntry entry : entries) {
195        TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
196        if (this.walEntrySinkFilter != null) {
197          if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
198            // Skip Cells in CellScanner associated with this entry.
199            int count = entry.getAssociatedCellCount();
200            for (int i = 0; i < count; i++) {
201              // Throw index out of bounds if our cell count is off
202              if (!cells.advance()) {
203                this.metrics.incrementFailedBatches();
204                throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
205              }
206            }
207            continue;
208          }
209        }
210        Cell previousCell = null;
211        Mutation mutation = null;
212        int count = entry.getAssociatedCellCount();
213        for (int i = 0; i < count; i++) {
214          // Throw index out of bounds if our cell count is off
215          if (!cells.advance()) {
216            this.metrics.incrementFailedBatches();
217            throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
218          }
219          Cell cell = cells.current();
220          // Handle bulk load hfiles replication
221          if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
222            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
223            if (bld.getReplicate()) {
224              if (bulkLoadsPerClusters == null) {
225                bulkLoadsPerClusters = new HashMap<>();
226              }
227              // Map of table name Vs list of pair of family and list of
228              // hfile paths from its namespace
229              Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
230                bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
231              buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
232            }
233          } else {
234            // Handle wal replication
235            if (isNewRowOrType(previousCell, cell)) {
236              // Create new mutation
237              mutation = CellUtil.isDelete(cell)
238                ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
239                : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
240              List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size());
241              for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
242                clusterIds.add(toUUID(clusterId));
243              }
244              mutation.setClusterIds(clusterIds);
245              addToHashMultiMap(rowMap, table, clusterIds, mutation);
246            }
247            if (CellUtil.isDelete(cell)) {
248              ((Delete) mutation).add(cell);
249            } else {
250              ((Put) mutation).add(cell);
251            }
252            previousCell = cell;
253          }
254        }
255        totalReplicated++;
256      }
257
258      // TODO Replicating mutations and bulk loaded data can be made parallel
259      if (!rowMap.isEmpty()) {
260        LOG.debug("Started replicating mutations.");
261        for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
262          batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
263        }
264        LOG.debug("Finished replicating mutations.");
265      }
266
267      if (bulkLoadsPerClusters != null) {
268        for (Entry<List<String>,
269          Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
270          Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
271          if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
272            LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString());
273            Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId);
274            try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf,
275              sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
276              getConnection(), entry.getKey())) {
277              hFileReplicator.replicate();
278              LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString());
279            }
280          }
281        }
282      }
283
284      int size = entries.size();
285      this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
286      this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
287      this.totalReplicatedEdits.addAndGet(totalReplicated);
288    } catch (IOException ex) {
289      LOG.error("Unable to accept edit because:", ex);
290      this.metrics.incrementFailedBatches();
291      throw ex;
292    }
293  }
294
295  private void buildBulkLoadHFileMap(
296    final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
297    BulkLoadDescriptor bld) throws IOException {
298    List<StoreDescriptor> storesList = bld.getStoresList();
299    int storesSize = storesList.size();
300    for (int j = 0; j < storesSize; j++) {
301      StoreDescriptor storeDescriptor = storesList.get(j);
302      List<String> storeFileList = storeDescriptor.getStoreFileList();
303      int storeFilesSize = storeFileList.size();
304      hfilesReplicated += storeFilesSize;
305      for (int k = 0; k < storeFilesSize; k++) {
306        byte[] family = storeDescriptor.getFamilyName().toByteArray();
307
308        // Build hfile relative path from its namespace
309        String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
310        String tableName = table.getNameWithNamespaceInclAsString();
311        List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
312        if (familyHFilePathsList != null) {
313          boolean foundFamily = false;
314          for (Pair<byte[], List<String>> familyHFilePathsPair : familyHFilePathsList) {
315            if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
316              // Found family already present, just add the path to the existing list
317              familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
318              foundFamily = true;
319              break;
320            }
321          }
322          if (!foundFamily) {
323            // Family not found, add this family and its hfile paths pair to the list
324            addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
325          }
326        } else {
327          // Add this table entry into the map
328          addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
329        }
330      }
331    }
332  }
333
334  private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
335    List<Pair<byte[], List<String>>> familyHFilePathsList) {
336    List<String> hfilePaths = new ArrayList<>(1);
337    hfilePaths.add(pathToHfileFromNS);
338    familyHFilePathsList.add(new Pair<>(family, hfilePaths));
339  }
340
341  private void addNewTableEntryInMap(
342    final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
343    String pathToHfileFromNS, String tableName) {
344    List<String> hfilePaths = new ArrayList<>(1);
345    hfilePaths.add(pathToHfileFromNS);
346    Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths);
347    List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>();
348    newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
349    bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
350  }
351
352  private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
353    byte[] family) {
354    return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
355      .append(table.getQualifierAsString()).append(Path.SEPARATOR)
356      .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
357      .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
358  }
359
360  /**
361   * nn * @return True if we have crossed over onto a new row or type
362   */
363  private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
364    return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
365      || !CellUtil.matchingRows(previousCell, cell);
366  }
367
368  private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
369    return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
370  }
371
372  /**
373   * Simple helper to a map from key to (a list of) values TODO: Make a general utility method nnnn
374   * * @return the list of values corresponding to key1 and key2
375   */
376  private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2,
377    V value) {
378    Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
379    List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
380    values.add(value);
381    return values;
382  }
383
384  /**
385   * stop the thread pool executor. It is called when the regionserver is stopped.
386   */
387  public void stopReplicationSinkServices() {
388    try {
389      if (this.sharedConnection != null) {
390        synchronized (sharedConnectionLock) {
391          if (this.sharedConnection != null) {
392            this.sharedConnection.close();
393            this.sharedConnection = null;
394          }
395        }
396      }
397    } catch (IOException e) {
398      // ignoring as we are closing.
399      LOG.warn("IOException while closing the sharedConnection", e);
400    }
401
402    try {
403      if (this.sharedAsyncConnection != null) {
404        synchronized (sharedAsyncConnectionLock) {
405          if (this.sharedAsyncConnection != null) {
406            this.sharedAsyncConnection.close();
407            this.sharedAsyncConnection = null;
408          }
409        }
410      }
411    } catch (IOException e) {
412      // ignoring as we are closing.
413      LOG.warn("IOException while closing the sharedAsyncConnection", e);
414    }
415  }
416
417  /**
418   * Do the changes and handle the pool
419   * @param tableName             table to insert into
420   * @param allRows               list of actions
421   * @param batchRowSizeThreshold rowSize threshold for batch mutation
422   */
423  private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
424    throws IOException {
425    if (allRows.isEmpty()) {
426      return;
427    }
428    AsyncTable<?> table = getAsyncConnection().getTable(tableName);
429    List<Future<?>> futures = new ArrayList<>();
430    for (List<Row> rows : allRows) {
431      List<List<Row>> batchRows;
432      if (rows.size() > batchRowSizeThreshold) {
433        batchRows = Lists.partition(rows, batchRowSizeThreshold);
434      } else {
435        batchRows = Collections.singletonList(rows);
436      }
437      futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList()));
438    }
439
440    for (Future<?> future : futures) {
441      try {
442        FutureUtils.get(future);
443      } catch (RetriesExhaustedException e) {
444        if (e.getCause() instanceof TableNotFoundException) {
445          throw new TableNotFoundException("'" + tableName + "'");
446        }
447        throw e;
448      }
449    }
450  }
451
452  /**
453   * Return the shared {@link Connection} which is used for handling bulk load hfiles replication.
454   */
455  private Connection getConnection() throws IOException {
456    // See https://en.wikipedia.org/wiki/Double-checked_locking
457    Connection connection = sharedConnection;
458    if (connection == null) {
459      synchronized (sharedConnectionLock) {
460        connection = sharedConnection;
461        if (connection == null) {
462          connection = ConnectionFactory.createConnection(conf);
463          sharedConnection = connection;
464        }
465      }
466    }
467    return connection;
468  }
469
470  /**
471   * Return the shared {@link AsyncConnection} which is used for handling wal replication.
472   */
473  private AsyncConnection getAsyncConnection() throws IOException {
474    // See https://en.wikipedia.org/wiki/Double-checked_locking
475    AsyncConnection asyncConnection = sharedAsyncConnection;
476    if (asyncConnection == null) {
477      synchronized (sharedAsyncConnectionLock) {
478        asyncConnection = sharedAsyncConnection;
479        if (asyncConnection == null) {
480          /**
481           * Get the AsyncConnection immediately.
482           */
483          asyncConnection = FutureUtils.get(ConnectionFactory.createAsyncConnection(conf));
484          sharedAsyncConnection = asyncConnection;
485        }
486      }
487    }
488    return asyncConnection;
489  }
490
491  /**
492   * Get a string representation of this sink's metrics
493   * @return string with the total replicated edits count and the date of the last edit that was
494   *         applied
495   */
496  public String getStats() {
497    return this.totalReplicatedEdits.get() == 0
498      ? ""
499      : "Sink: " + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp()
500        + ", total replicated edits: " + this.totalReplicatedEdits;
501  }
502
503  /**
504   * Get replication Sink Metrics n
505   */
506  public MetricsSink getSinkMetrics() {
507    return this.metrics;
508  }
509}