001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.TreeMap;
029import java.util.UUID;
030import java.util.concurrent.Future;
031import java.util.concurrent.atomic.AtomicLong;
032import java.util.stream.Collectors;
033import org.apache.commons.lang3.StringUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellScanner;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.TableNotFoundException;
043import org.apache.hadoop.hbase.client.AsyncClusterConnection;
044import org.apache.hadoop.hbase.client.AsyncTable;
045import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
046import org.apache.hadoop.hbase.client.Delete;
047import org.apache.hadoop.hbase.client.Mutation;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.RetriesExhaustedException;
050import org.apache.hadoop.hbase.client.Row;
051import org.apache.hadoop.hbase.replication.ReplicationUtils;
052import org.apache.hadoop.hbase.security.UserProvider;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.FutureUtils;
055import org.apache.hadoop.hbase.util.Pair;
056import org.apache.hadoop.hbase.wal.WALEdit;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
062
063import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
067
068/**
069 * <p>
070 * This class is responsible for replicating the edits coming from another cluster.
071 * </p>
072 * <p>
073 * This replication process is currently waiting for the edits to be applied before the method can
074 * return. This means that the replication of edits is synchronized (after reading from WALs in
075 * ReplicationSource) and that a single region server cannot receive edits from two sources at the
076 * same time
077 * </p>
078 * <p>
079 * This class uses the native HBase client in order to replicate entries.
080 * </p>
081 * TODO make this class more like ReplicationSource wrt log handling
082 */
083@InterfaceAudience.Private
084public class ReplicationSink {
085
086  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class);
087  private final Configuration conf;
088  // Volatile because of note in here -- look for double-checked locking:
089  // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
090  private volatile AsyncClusterConnection sharedConn;
091  private final MetricsSink metrics;
092  private final AtomicLong totalReplicatedEdits = new AtomicLong();
093  private final Object sharedConnLock = new Object();
094  // Number of hfiles that we successfully replicated
095  private long hfilesReplicated = 0;
096  private SourceFSConfigurationProvider provider;
097  private WALEntrySinkFilter walEntrySinkFilter;
098
099  /**
100   * Row size threshold for multi requests above which a warning is logged
101   */
102  private final int rowSizeWarnThreshold;
103
104  /**
105   * Create a sink for replication
106   * @param conf conf object
107   * @throws IOException thrown when HDFS goes bad or bad file name
108   */
109  public ReplicationSink(Configuration conf) throws IOException {
110    this.conf = HBaseConfiguration.create(conf);
111    rowSizeWarnThreshold =
112      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
113    decorateConf();
114    this.metrics = new MetricsSink();
115    this.walEntrySinkFilter = setupWALEntrySinkFilter();
116    String className = conf.get("hbase.replication.source.fs.conf.provider",
117      DefaultSourceFSConfigurationProvider.class.getCanonicalName());
118    try {
119      Class<? extends SourceFSConfigurationProvider> c =
120        Class.forName(className).asSubclass(SourceFSConfigurationProvider.class);
121      this.provider = c.getDeclaredConstructor().newInstance();
122    } catch (Exception e) {
123      throw new IllegalArgumentException(
124        "Configured source fs configuration provider class " + className + " throws error.", e);
125    }
126  }
127
128  private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException {
129    Class<?> walEntryFilterClass =
130      this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null);
131    WALEntrySinkFilter filter = null;
132    try {
133      filter = walEntryFilterClass == null
134        ? null
135        : (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance();
136    } catch (Exception e) {
137      LOG.warn("Failed to instantiate " + walEntryFilterClass);
138    }
139    if (filter != null) {
140      filter.init(getConnection());
141    }
142    return filter;
143  }
144
145  /**
146   * decorate the Configuration object to make replication more receptive to delays: lessen the
147   * timeout and numTries.
148   */
149  private void decorateConf() {
150    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
151      this.conf.getInt("replication.sink.client.retries.number", 4));
152    this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
153      this.conf.getInt("replication.sink.client.ops.timeout", 10000));
154    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
155    if (StringUtils.isNotEmpty(replicationCodec)) {
156      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
157    }
158    // use server ZK cluster for replication, so we unset the client ZK related properties if any
159    if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
160      this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
161    }
162  }
163
164  /**
165   * Replicate this array of entries directly into the local cluster using the native client. Only
166   * operates against raw protobuf type saving on a conversion from pb to pojo.
167   * @param replicationClusterId       Id which will uniquely identify source cluster FS client
168   *                                   configurations in the replication configuration directory
169   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
170   *                                   directory
171   * @param sourceHFileArchiveDirPath  Path that point to the source cluster hfile archive directory
172   * @throws IOException If failed to replicate the data
173   */
174  public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
175    String replicationClusterId, String sourceBaseNamespaceDirPath,
176    String sourceHFileArchiveDirPath) throws IOException {
177    if (entries.isEmpty()) {
178      return;
179    }
180    // Very simple optimization where we batch sequences of rows going
181    // to the same table.
182    try {
183      long totalReplicated = 0;
184      // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
185      // invocation of this method per table and cluster id.
186      Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
187
188      Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
189      for (WALEntry entry : entries) {
190        TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
191        if (this.walEntrySinkFilter != null) {
192          if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
193            // Skip Cells in CellScanner associated with this entry.
194            int count = entry.getAssociatedCellCount();
195            for (int i = 0; i < count; i++) {
196              // Throw index out of bounds if our cell count is off
197              if (!cells.advance()) {
198                this.metrics.incrementFailedBatches();
199                throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
200              }
201            }
202            continue;
203          }
204        }
205        Cell previousCell = null;
206        Mutation mutation = null;
207        int count = entry.getAssociatedCellCount();
208        for (int i = 0; i < count; i++) {
209          // Throw index out of bounds if our cell count is off
210          if (!cells.advance()) {
211            this.metrics.incrementFailedBatches();
212            throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
213          }
214          Cell cell = cells.current();
215          // Handle bulk load hfiles replication
216          if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
217            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
218            if (bld.getReplicate()) {
219              if (bulkLoadsPerClusters == null) {
220                bulkLoadsPerClusters = new HashMap<>();
221              }
222              // Map of table name Vs list of pair of family and list of
223              // hfile paths from its namespace
224              Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
225                bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
226              buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
227            }
228          } else {
229            // Handle wal replication
230            if (isNewRowOrType(previousCell, cell)) {
231              // Create new mutation
232              mutation = CellUtil.isDelete(cell)
233                ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
234                : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
235              List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size());
236              for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
237                clusterIds.add(toUUID(clusterId));
238              }
239              mutation.setClusterIds(clusterIds);
240              mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME,
241                HConstants.EMPTY_BYTE_ARRAY);
242              addToHashMultiMap(rowMap, table, clusterIds, mutation);
243            }
244            if (CellUtil.isDelete(cell)) {
245              ((Delete) mutation).add(cell);
246            } else {
247              ((Put) mutation).add(cell);
248            }
249            previousCell = cell;
250          }
251        }
252        totalReplicated++;
253      }
254
255      // TODO Replicating mutations and bulk loaded data can be made parallel
256      if (!rowMap.isEmpty()) {
257        LOG.debug("Started replicating mutations.");
258        for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
259          batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
260        }
261        LOG.debug("Finished replicating mutations.");
262      }
263
264      if (bulkLoadsPerClusters != null) {
265        for (Entry<List<String>,
266          Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
267          Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
268          if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
269            LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString());
270            Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId);
271            try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf,
272              sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
273              getConnection(), entry.getKey())) {
274              hFileReplicator.replicate();
275              LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString());
276            }
277          }
278        }
279      }
280
281      int size = entries.size();
282      this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
283      this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
284      this.totalReplicatedEdits.addAndGet(totalReplicated);
285    } catch (IOException ex) {
286      LOG.error("Unable to accept edit because:", ex);
287      this.metrics.incrementFailedBatches();
288      throw ex;
289    }
290  }
291
292  private void buildBulkLoadHFileMap(
293    final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
294    BulkLoadDescriptor bld) throws IOException {
295    List<StoreDescriptor> storesList = bld.getStoresList();
296    int storesSize = storesList.size();
297    for (int j = 0; j < storesSize; j++) {
298      StoreDescriptor storeDescriptor = storesList.get(j);
299      List<String> storeFileList = storeDescriptor.getStoreFileList();
300      int storeFilesSize = storeFileList.size();
301      hfilesReplicated += storeFilesSize;
302      for (int k = 0; k < storeFilesSize; k++) {
303        byte[] family = storeDescriptor.getFamilyName().toByteArray();
304
305        // Build hfile relative path from its namespace
306        String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
307        String tableName = table.getNameWithNamespaceInclAsString();
308        List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
309        if (familyHFilePathsList != null) {
310          boolean foundFamily = false;
311          for (Pair<byte[], List<String>> familyHFilePathsPair : familyHFilePathsList) {
312            if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
313              // Found family already present, just add the path to the existing list
314              familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
315              foundFamily = true;
316              break;
317            }
318          }
319          if (!foundFamily) {
320            // Family not found, add this family and its hfile paths pair to the list
321            addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
322          }
323        } else {
324          // Add this table entry into the map
325          addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
326        }
327      }
328    }
329  }
330
331  private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
332    List<Pair<byte[], List<String>>> familyHFilePathsList) {
333    List<String> hfilePaths = new ArrayList<>(1);
334    hfilePaths.add(pathToHfileFromNS);
335    familyHFilePathsList.add(new Pair<>(family, hfilePaths));
336  }
337
338  private void addNewTableEntryInMap(
339    final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
340    String pathToHfileFromNS, String tableName) {
341    List<String> hfilePaths = new ArrayList<>(1);
342    hfilePaths.add(pathToHfileFromNS);
343    Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths);
344    List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>();
345    newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
346    bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
347  }
348
349  private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
350    byte[] family) {
351    return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
352      .append(table.getQualifierAsString()).append(Path.SEPARATOR)
353      .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
354      .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
355  }
356
357  /** Returns True if we have crossed over onto a new row or type */
358  private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
359    return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
360      || !CellUtil.matchingRows(previousCell, cell);
361  }
362
363  private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
364    return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
365  }
366
367  /**
368   * Simple helper to a map from key to (a list of) values TODO: Make a general utility method
369   * @return the list of values corresponding to key1 and key2
370   */
371  private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2,
372    V value) {
373    Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
374    List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
375    values.add(value);
376    return values;
377  }
378
379  /**
380   * stop the thread pool executor. It is called when the regionserver is stopped.
381   */
382  public void stopReplicationSinkServices() {
383    try {
384      if (this.sharedConn != null) {
385        synchronized (sharedConnLock) {
386          if (this.sharedConn != null) {
387            this.sharedConn.close();
388            this.sharedConn = null;
389          }
390        }
391      }
392    } catch (IOException e) {
393      LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
394    }
395  }
396
397  /**
398   * Do the changes and handle the pool
399   * @param tableName             table to insert into
400   * @param allRows               list of actions
401   * @param batchRowSizeThreshold rowSize threshold for batch mutation
402   */
403  private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
404    throws IOException {
405    if (allRows.isEmpty()) {
406      return;
407    }
408    AsyncTable<?> table = getConnection().getTable(tableName);
409    List<Future<?>> futures = new ArrayList<>();
410    for (List<Row> rows : allRows) {
411      List<List<Row>> batchRows;
412      if (rows.size() > batchRowSizeThreshold) {
413        batchRows = Lists.partition(rows, batchRowSizeThreshold);
414      } else {
415        batchRows = Collections.singletonList(rows);
416      }
417      futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList()));
418    }
419    for (Future<?> future : futures) {
420      try {
421        FutureUtils.get(future);
422      } catch (RetriesExhaustedException e) {
423        if (e.getCause() instanceof TableNotFoundException) {
424          throw new TableNotFoundException("'" + tableName + "'");
425        }
426        throw e;
427      }
428    }
429  }
430
431  private AsyncClusterConnection getConnection() throws IOException {
432    // See https://en.wikipedia.org/wiki/Double-checked_locking
433    AsyncClusterConnection connection = sharedConn;
434    if (connection == null) {
435      synchronized (sharedConnLock) {
436        connection = sharedConn;
437        if (connection == null) {
438          connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
439            UserProvider.instantiate(conf).getCurrent());
440          sharedConn = connection;
441        }
442      }
443    }
444    return connection;
445  }
446
447  /**
448   * Get a string representation of this sink's metrics
449   * @return string with the total replicated edits count and the date of the last edit that was
450   *         applied
451   */
452  public String getStats() {
453    long total = this.totalReplicatedEdits.get();
454    return total == 0
455      ? ""
456      : "Sink: " + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp()
457        + ", total replicated edits: " + total;
458  }
459
460  /**
461   * Get replication Sink Metrics n
462   */
463  public MetricsSink getSinkMetrics() {
464    return this.metrics;
465  }
466}