001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.TreeMap;
031import java.util.UUID;
032import java.util.concurrent.atomic.AtomicLong;
033import org.apache.commons.lang3.StringUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellScanner;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.TableNotFoundException;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.Delete;
049import org.apache.hadoop.hbase.client.Mutation;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
052import org.apache.hadoop.hbase.client.Row;
053import org.apache.hadoop.hbase.client.Table;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.Pair;
056import org.apache.hadoop.hbase.wal.WALEdit;
057import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
062
063/**
064 * <p>
065 * This class is responsible for replicating the edits coming
066 * from another cluster.
067 * </p><p>
068 * This replication process is currently waiting for the edits to be applied
069 * before the method can return. This means that the replication of edits
070 * is synchronized (after reading from WALs in ReplicationSource) and that a
071 * single region server cannot receive edits from two sources at the same time
072 * </p><p>
073 * This class uses the native HBase client in order to replicate entries.
074 * </p>
075 *
076 * TODO make this class more like ReplicationSource wrt log handling
077 */
078@InterfaceAudience.Private
079public class ReplicationSink {
080
081  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class);
082  private final Configuration conf;
083  // Volatile because of note in here -- look for double-checked locking:
084  // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
085  private volatile Connection sharedHtableCon;
086  private final MetricsSink metrics;
087  private final AtomicLong totalReplicatedEdits = new AtomicLong();
088  private final Object sharedHtableConLock = new Object();
089  // Number of hfiles that we successfully replicated
090  private long hfilesReplicated = 0;
091  private SourceFSConfigurationProvider provider;
092  private WALEntrySinkFilter walEntrySinkFilter;
093
094  /**
095   * Row size threshold for multi requests above which a warning is logged
096   */
097  private final int rowSizeWarnThreshold;
098
099  /**
100   * Create a sink for replication
101   * @param conf conf object
102   * @throws IOException thrown when HDFS goes bad or bad file name
103   */
104  public ReplicationSink(Configuration conf)
105      throws IOException {
106    this.conf = HBaseConfiguration.create(conf);
107    rowSizeWarnThreshold = conf.getInt(
108      HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
109    decorateConf();
110    this.metrics = new MetricsSink();
111    this.walEntrySinkFilter = setupWALEntrySinkFilter();
112    String className =
113        conf.get("hbase.replication.source.fs.conf.provider",
114          DefaultSourceFSConfigurationProvider.class.getCanonicalName());
115    try {
116      @SuppressWarnings("rawtypes")
117      Class c = Class.forName(className);
118      this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance();
119    } catch (Exception e) {
120      throw new IllegalArgumentException("Configured source fs configuration provider class "
121          + className + " throws error.", e);
122    }
123  }
124
125  private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException {
126    Class<?> walEntryFilterClass =
127        this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null);
128    WALEntrySinkFilter filter = null;
129    try {
130      filter = walEntryFilterClass == null? null:
131          (WALEntrySinkFilter)walEntryFilterClass.getDeclaredConstructor().newInstance();
132    } catch (Exception e) {
133      LOG.warn("Failed to instantiate " + walEntryFilterClass);
134    }
135    if (filter != null) {
136      filter.init(getConnection());
137    }
138    return filter;
139  }
140
141  /**
142   * decorate the Configuration object to make replication more receptive to delays:
143   * lessen the timeout and numTries.
144   */
145  private void decorateConf() {
146    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
147        this.conf.getInt("replication.sink.client.retries.number", 4));
148    this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
149        this.conf.getInt("replication.sink.client.ops.timeout", 10000));
150    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
151    if (StringUtils.isNotEmpty(replicationCodec)) {
152      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
153    }
154    // use server ZK cluster for replication, so we unset the client ZK related properties if any
155    if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
156      this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
157    }
158   }
159
160  /**
161   * Replicate this array of entries directly into the local cluster using the native client. Only
162   * operates against raw protobuf type saving on a conversion from pb to pojo.
163   * @param replicationClusterId Id which will uniquely identify source cluster FS client
164   *          configurations in the replication configuration directory
165   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
166   *          directory
167   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
168   * @throws IOException If failed to replicate the data
169   */
170  public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
171      String replicationClusterId, String sourceBaseNamespaceDirPath,
172      String sourceHFileArchiveDirPath) throws IOException {
173    if (entries.isEmpty()) return;
174    // Very simple optimization where we batch sequences of rows going
175    // to the same table.
176    try {
177      long totalReplicated = 0;
178      // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
179      // invocation of this method per table and cluster id.
180      Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
181
182      Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
183      for (WALEntry entry : entries) {
184        TableName table =
185            TableName.valueOf(entry.getKey().getTableName().toByteArray());
186        if (this.walEntrySinkFilter != null) {
187          if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
188            // Skip Cells in CellScanner associated with this entry.
189            int count = entry.getAssociatedCellCount();
190            for (int i = 0; i < count; i++) {
191              // Throw index out of bounds if our cell count is off
192              if (!cells.advance()) {
193                throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
194              }
195            }
196            continue;
197          }
198        }
199        Cell previousCell = null;
200        Mutation mutation = null;
201        int count = entry.getAssociatedCellCount();
202        for (int i = 0; i < count; i++) {
203          // Throw index out of bounds if our cell count is off
204          if (!cells.advance()) {
205            throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
206          }
207          Cell cell = cells.current();
208          // Handle bulk load hfiles replication
209          if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
210            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
211            if(bld.getReplicate()) {
212              if (bulkLoadsPerClusters == null) {
213                bulkLoadsPerClusters = new HashMap<>();
214              }
215              // Map of table name Vs list of pair of family and list of
216              // hfile paths from its namespace
217              Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
218                bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
219              buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
220            }
221          } else {
222            // Handle wal replication
223            if (isNewRowOrType(previousCell, cell)) {
224              // Create new mutation
225              mutation =
226                  CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
227                      cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
228                      cell.getRowLength());
229              List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size());
230              for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
231                clusterIds.add(toUUID(clusterId));
232              }
233              mutation.setClusterIds(clusterIds);
234              addToHashMultiMap(rowMap, table, clusterIds, mutation);
235            }
236            if (CellUtil.isDelete(cell)) {
237              ((Delete) mutation).add(cell);
238            } else {
239              ((Put) mutation).add(cell);
240            }
241            previousCell = cell;
242          }
243        }
244        totalReplicated++;
245      }
246
247      // TODO Replicating mutations and bulk loaded data can be made parallel
248      if (!rowMap.isEmpty()) {
249        LOG.debug("Started replicating mutations.");
250        for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
251          batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
252        }
253        LOG.debug("Finished replicating mutations.");
254      }
255
256      if(bulkLoadsPerClusters != null) {
257        for (Entry<List<String>, Map<String, List<Pair<byte[],
258          List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
259          Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
260          if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
261            LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString());
262            Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId);
263            try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf,
264                sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
265                getConnection(), entry.getKey())) {
266              hFileReplicator.replicate();
267              LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString());
268            }
269          }
270        }
271      }
272
273      int size = entries.size();
274      this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
275      this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
276      this.totalReplicatedEdits.addAndGet(totalReplicated);
277    } catch (IOException ex) {
278      LOG.error("Unable to accept edit because:", ex);
279      throw ex;
280    }
281  }
282
283  private void buildBulkLoadHFileMap(
284      final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
285      BulkLoadDescriptor bld) throws IOException {
286    List<StoreDescriptor> storesList = bld.getStoresList();
287    int storesSize = storesList.size();
288    for (int j = 0; j < storesSize; j++) {
289      StoreDescriptor storeDescriptor = storesList.get(j);
290      List<String> storeFileList = storeDescriptor.getStoreFileList();
291      int storeFilesSize = storeFileList.size();
292      hfilesReplicated += storeFilesSize;
293      for (int k = 0; k < storeFilesSize; k++) {
294        byte[] family = storeDescriptor.getFamilyName().toByteArray();
295
296        // Build hfile relative path from its namespace
297        String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
298        String tableName = table.getNameWithNamespaceInclAsString();
299        List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
300        if (familyHFilePathsList != null) {
301          boolean foundFamily = false;
302          for (Pair<byte[], List<String>> familyHFilePathsPair :  familyHFilePathsList) {
303            if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
304              // Found family already present, just add the path to the existing list
305              familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
306              foundFamily = true;
307              break;
308            }
309          }
310          if (!foundFamily) {
311            // Family not found, add this family and its hfile paths pair to the list
312            addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
313          }
314        } else {
315          // Add this table entry into the map
316          addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
317        }
318      }
319    }
320  }
321
322  private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
323      List<Pair<byte[], List<String>>> familyHFilePathsList) {
324    List<String> hfilePaths = new ArrayList<>(1);
325    hfilePaths.add(pathToHfileFromNS);
326    familyHFilePathsList.add(new Pair<>(family, hfilePaths));
327  }
328
329  private void addNewTableEntryInMap(
330      final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
331      String pathToHfileFromNS, String tableName) {
332    List<String> hfilePaths = new ArrayList<>(1);
333    hfilePaths.add(pathToHfileFromNS);
334    Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths);
335    List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>();
336    newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
337    bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
338  }
339
340  private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
341      byte[] family) {
342    return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
343        .append(table.getQualifierAsString()).append(Path.SEPARATOR)
344        .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
345        .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
346  }
347
348  /**
349   * @param previousCell
350   * @param cell
351   * @return True if we have crossed over onto a new row or type
352   */
353  private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
354    return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() ||
355        !CellUtil.matchingRows(previousCell, cell);
356  }
357
358  private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
359    return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
360  }
361
362  /**
363   * Simple helper to a map from key to (a list of) values
364   * TODO: Make a general utility method
365   * @param map
366   * @param key1
367   * @param key2
368   * @param value
369   * @return the list of values corresponding to key1 and key2
370   */
371  private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2,
372      V value) {
373    Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
374    List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
375    values.add(value);
376    return values;
377  }
378
379  /**
380   * stop the thread pool executor. It is called when the regionserver is stopped.
381   */
382  public void stopReplicationSinkServices() {
383    try {
384      if (this.sharedHtableCon != null) {
385        synchronized (sharedHtableConLock) {
386          if (this.sharedHtableCon != null) {
387            this.sharedHtableCon.close();
388            this.sharedHtableCon = null;
389          }
390        }
391      }
392    } catch (IOException e) {
393      LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
394    }
395  }
396
397
398  /**
399   * Do the changes and handle the pool
400   * @param tableName table to insert into
401   * @param allRows list of actions
402   * @param batchRowSizeThreshold rowSize threshold for batch mutation
403   */
404  private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
405      throws IOException {
406    if (allRows.isEmpty()) {
407      return;
408    }
409    Table table = null;
410    try {
411      Connection connection = getConnection();
412      table = connection.getTable(tableName);
413      for (List<Row> rows : allRows) {
414        List<List<Row>> batchRows;
415        if (rows.size() > batchRowSizeThreshold) {
416          batchRows = Lists.partition(rows, batchRowSizeThreshold);
417        } else {
418          batchRows = Collections.singletonList(rows);
419        }
420        for(List<Row> rowList:batchRows){
421          table.batch(rowList, null);
422        }
423      }
424    } catch (RetriesExhaustedWithDetailsException rewde) {
425      for (Throwable ex : rewde.getCauses()) {
426        if (ex instanceof TableNotFoundException) {
427          throw new TableNotFoundException("'" + tableName + "'");
428        }
429      }
430      throw rewde;
431    } catch (InterruptedException ix) {
432      throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
433    } finally {
434      if (table != null) {
435        table.close();
436      }
437    }
438  }
439
440  private Connection getConnection() throws IOException {
441    // See https://en.wikipedia.org/wiki/Double-checked_locking
442    Connection connection = sharedHtableCon;
443    if (connection == null) {
444      synchronized (sharedHtableConLock) {
445        connection = sharedHtableCon;
446        if (connection == null) {
447          connection = sharedHtableCon = ConnectionFactory.createConnection(conf);
448        }
449      }
450    }
451    return connection;
452  }
453
454  /**
455   * Get a string representation of this sink's metrics
456   * @return string with the total replicated edits count and the date
457   * of the last edit that was applied
458   */
459  public String getStats() {
460    return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " +
461      "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
462      ", total replicated edits: " + this.totalReplicatedEdits;
463  }
464
465  /**
466   * Get replication Sink Metrics
467   * @return MetricsSink
468   */
469  public MetricsSink getSinkMetrics() {
470    return this.metrics;
471  }
472}