001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Map.Entry;
029import java.util.TreeMap;
030import java.util.UUID;
031import java.util.concurrent.atomic.AtomicLong;
032
033import org.apache.commons.lang3.StringUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellScanner;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.Stoppable;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.TableNotFoundException;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.ConnectionFactory;
049import org.apache.hadoop.hbase.client.Delete;
050import org.apache.hadoop.hbase.client.Mutation;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
053import org.apache.hadoop.hbase.client.Row;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
059import org.apache.hadoop.hbase.wal.WALEdit;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.Pair;
062
063/**
064 * <p>
065 * This class is responsible for replicating the edits coming
066 * from another cluster.
067 * </p><p>
068 * This replication process is currently waiting for the edits to be applied
069 * before the method can return. This means that the replication of edits
070 * is synchronized (after reading from WALs in ReplicationSource) and that a
071 * single region server cannot receive edits from two sources at the same time
072 * </p><p>
073 * This class uses the native HBase client in order to replicate entries.
074 * </p>
075 *
076 * TODO make this class more like ReplicationSource wrt log handling
077 */
078@InterfaceAudience.Private
079public class ReplicationSink {
080
081  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class);
082  private final Configuration conf;
083  // Volatile because of note in here -- look for double-checked locking:
084  // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
085  private volatile Connection sharedHtableCon;
086  private final MetricsSink metrics;
087  private final AtomicLong totalReplicatedEdits = new AtomicLong();
088  private final Object sharedHtableConLock = new Object();
089  // Number of hfiles that we successfully replicated
090  private long hfilesReplicated = 0;
091  private SourceFSConfigurationProvider provider;
092  private WALEntrySinkFilter walEntrySinkFilter;
093
094  /**
095   * Create a sink for replication
096   *
097   * @param conf                conf object
098   * @param stopper             boolean to tell this thread to stop
099   * @throws IOException thrown when HDFS goes bad or bad file name
100   */
101  public ReplicationSink(Configuration conf, Stoppable stopper)
102      throws IOException {
103    this.conf = HBaseConfiguration.create(conf);
104    decorateConf();
105    this.metrics = new MetricsSink();
106    this.walEntrySinkFilter = setupWALEntrySinkFilter();
107    String className =
108        conf.get("hbase.replication.source.fs.conf.provider",
109          DefaultSourceFSConfigurationProvider.class.getCanonicalName());
110    try {
111      @SuppressWarnings("rawtypes")
112      Class c = Class.forName(className);
113      this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance();
114    } catch (Exception e) {
115      throw new IllegalArgumentException("Configured source fs configuration provider class "
116          + className + " throws error.", e);
117    }
118  }
119
120  private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException {
121    Class<?> walEntryFilterClass =
122        this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null);
123    WALEntrySinkFilter filter = null;
124    try {
125      filter = walEntryFilterClass == null? null:
126          (WALEntrySinkFilter)walEntryFilterClass.getDeclaredConstructor().newInstance();
127    } catch (Exception e) {
128      LOG.warn("Failed to instantiate " + walEntryFilterClass);
129    }
130    if (filter != null) {
131      filter.init(getConnection());
132    }
133    return filter;
134  }
135
136  /**
137   * decorate the Configuration object to make replication more receptive to delays:
138   * lessen the timeout and numTries.
139   */
140  private void decorateConf() {
141    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
142        this.conf.getInt("replication.sink.client.retries.number", 4));
143    this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
144        this.conf.getInt("replication.sink.client.ops.timeout", 10000));
145    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
146    if (StringUtils.isNotEmpty(replicationCodec)) {
147      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
148    }
149    // use server ZK cluster for replication, so we unset the client ZK related properties if any
150    if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
151      this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
152    }
153   }
154
155  /**
156   * Replicate this array of entries directly into the local cluster using the native client. Only
157   * operates against raw protobuf type saving on a conversion from pb to pojo.
158   * @param replicationClusterId Id which will uniquely identify source cluster FS client
159   *          configurations in the replication configuration directory
160   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
161   *          directory
162   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
163   * @throws IOException If failed to replicate the data
164   */
165  public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
166      String replicationClusterId, String sourceBaseNamespaceDirPath,
167      String sourceHFileArchiveDirPath) throws IOException {
168    if (entries.isEmpty()) return;
169    // Very simple optimization where we batch sequences of rows going
170    // to the same table.
171    try {
172      long totalReplicated = 0;
173      // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
174      // invocation of this method per table and cluster id.
175      Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
176
177      Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
178      for (WALEntry entry : entries) {
179        TableName table =
180            TableName.valueOf(entry.getKey().getTableName().toByteArray());
181        if (this.walEntrySinkFilter != null) {
182          if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
183            // Skip Cells in CellScanner associated with this entry.
184            int count = entry.getAssociatedCellCount();
185            for (int i = 0; i < count; i++) {
186              // Throw index out of bounds if our cell count is off
187              if (!cells.advance()) {
188                throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
189              }
190            }
191            continue;
192          }
193        }
194        Cell previousCell = null;
195        Mutation mutation = null;
196        int count = entry.getAssociatedCellCount();
197        for (int i = 0; i < count; i++) {
198          // Throw index out of bounds if our cell count is off
199          if (!cells.advance()) {
200            throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
201          }
202          Cell cell = cells.current();
203          // Handle bulk load hfiles replication
204          if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
205            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
206            if(bld.getReplicate()) {
207              if (bulkLoadsPerClusters == null) {
208                bulkLoadsPerClusters = new HashMap<>();
209              }
210              // Map of table name Vs list of pair of family and list of
211              // hfile paths from its namespace
212              Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
213                bulkLoadsPerClusters.get(bld.getClusterIdsList());
214              if (bulkLoadHFileMap == null) {
215                bulkLoadHFileMap = new HashMap<>();
216                bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
217              }
218              buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
219            }
220          } else {
221            // Handle wal replication
222            if (isNewRowOrType(previousCell, cell)) {
223              // Create new mutation
224              mutation =
225                  CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
226                      cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
227                      cell.getRowLength());
228              List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size());
229              for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
230                clusterIds.add(toUUID(clusterId));
231              }
232              mutation.setClusterIds(clusterIds);
233              addToHashMultiMap(rowMap, table, clusterIds, mutation);
234            }
235            if (CellUtil.isDelete(cell)) {
236              ((Delete) mutation).add(cell);
237            } else {
238              ((Put) mutation).add(cell);
239            }
240            previousCell = cell;
241          }
242        }
243        totalReplicated++;
244      }
245
246      // TODO Replicating mutations and bulk loaded data can be made parallel
247      if (!rowMap.isEmpty()) {
248        LOG.debug("Started replicating mutations.");
249        for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
250          batch(entry.getKey(), entry.getValue().values());
251        }
252        LOG.debug("Finished replicating mutations.");
253      }
254
255      if(bulkLoadsPerClusters != null) {
256        for (Entry<List<String>, Map<String, List<Pair<byte[],
257          List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
258          Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
259          if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
260            if(LOG.isDebugEnabled()) {
261              LOG.debug("Started replicating bulk loaded data from cluster ids: {}.",
262                entry.getKey().toString());
263            }
264            HFileReplicator hFileReplicator =
265              new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
266                sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
267                getConnection(), entry.getKey());
268            hFileReplicator.replicate();
269            if(LOG.isDebugEnabled()) {
270              LOG.debug("Finished replicating bulk loaded data from cluster id: {}",
271                entry.getKey().toString());
272            }
273          }
274        }
275      }
276
277      int size = entries.size();
278      this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
279      this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
280      this.totalReplicatedEdits.addAndGet(totalReplicated);
281    } catch (IOException ex) {
282      LOG.error("Unable to accept edit because:", ex);
283      throw ex;
284    }
285  }
286
287  private void buildBulkLoadHFileMap(
288      final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
289      BulkLoadDescriptor bld) throws IOException {
290    List<StoreDescriptor> storesList = bld.getStoresList();
291    int storesSize = storesList.size();
292    for (int j = 0; j < storesSize; j++) {
293      StoreDescriptor storeDescriptor = storesList.get(j);
294      List<String> storeFileList = storeDescriptor.getStoreFileList();
295      int storeFilesSize = storeFileList.size();
296      hfilesReplicated += storeFilesSize;
297      for (int k = 0; k < storeFilesSize; k++) {
298        byte[] family = storeDescriptor.getFamilyName().toByteArray();
299
300        // Build hfile relative path from its namespace
301        String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
302
303        String tableName = table.getNameWithNamespaceInclAsString();
304        if (bulkLoadHFileMap.containsKey(tableName)) {
305          List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
306          boolean foundFamily = false;
307          for (int i = 0; i < familyHFilePathsList.size(); i++) {
308            Pair<byte[], List<String>> familyHFilePathsPair = familyHFilePathsList.get(i);
309            if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
310              // Found family already present, just add the path to the existing list
311              familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
312              foundFamily = true;
313              break;
314            }
315          }
316          if (!foundFamily) {
317            // Family not found, add this family and its hfile paths pair to the list
318            addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
319          }
320        } else {
321          // Add this table entry into the map
322          addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
323        }
324      }
325    }
326  }
327
328  private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
329      List<Pair<byte[], List<String>>> familyHFilePathsList) {
330    List<String> hfilePaths = new ArrayList<>(1);
331    hfilePaths.add(pathToHfileFromNS);
332    familyHFilePathsList.add(new Pair<>(family, hfilePaths));
333  }
334
335  private void addNewTableEntryInMap(
336      final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
337      String pathToHfileFromNS, String tableName) {
338    List<String> hfilePaths = new ArrayList<>(1);
339    hfilePaths.add(pathToHfileFromNS);
340    Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths);
341    List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>();
342    newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
343    bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
344  }
345
346  private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
347      byte[] family) {
348    return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
349        .append(table.getQualifierAsString()).append(Path.SEPARATOR)
350        .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
351        .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
352  }
353
354  /**
355   * @param previousCell
356   * @param cell
357   * @return True if we have crossed over onto a new row or type
358   */
359  private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
360    return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() ||
361        !CellUtil.matchingRows(previousCell, cell);
362  }
363
364  private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
365    return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
366  }
367
368  /**
369   * Simple helper to a map from key to (a list of) values
370   * TODO: Make a general utility method
371   * @param map
372   * @param key1
373   * @param key2
374   * @param value
375   * @return the list of values corresponding to key1 and key2
376   */
377  private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
378    Map<K2,List<V>> innerMap = map.get(key1);
379    if (innerMap == null) {
380      innerMap = new HashMap<>();
381      map.put(key1, innerMap);
382    }
383    List<V> values = innerMap.get(key2);
384    if (values == null) {
385      values = new ArrayList<>();
386      innerMap.put(key2, values);
387    }
388    values.add(value);
389    return values;
390  }
391
392  /**
393   * stop the thread pool executor. It is called when the regionserver is stopped.
394   */
395  public void stopReplicationSinkServices() {
396    try {
397      if (this.sharedHtableCon != null) {
398        synchronized (sharedHtableConLock) {
399          if (this.sharedHtableCon != null) {
400            this.sharedHtableCon.close();
401            this.sharedHtableCon = null;
402          }
403        }
404      }
405    } catch (IOException e) {
406      LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
407    }
408  }
409
410
411  /**
412   * Do the changes and handle the pool
413   * @param tableName table to insert into
414   * @param allRows list of actions
415   * @throws IOException
416   */
417  protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
418    if (allRows.isEmpty()) {
419      return;
420    }
421    Table table = null;
422    try {
423      Connection connection = getConnection();
424      table = connection.getTable(tableName);
425      for (List<Row> rows : allRows) {
426        table.batch(rows, null);
427      }
428    } catch (RetriesExhaustedWithDetailsException rewde) {
429      for (Throwable ex : rewde.getCauses()) {
430        if (ex instanceof TableNotFoundException) {
431          throw new TableNotFoundException("'" + tableName + "'");
432        }
433      }
434      throw rewde;
435    } catch (InterruptedException ix) {
436      throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
437    } finally {
438      if (table != null) {
439        table.close();
440      }
441    }
442  }
443
444  private Connection getConnection() throws IOException {
445    // See https://en.wikipedia.org/wiki/Double-checked_locking
446    Connection connection = sharedHtableCon;
447    if (connection == null) {
448      synchronized (sharedHtableConLock) {
449        connection = sharedHtableCon;
450        if (connection == null) {
451          connection = sharedHtableCon = ConnectionFactory.createConnection(conf);
452        }
453      }
454    }
455    return connection;
456  }
457
458  /**
459   * Get a string representation of this sink's metrics
460   * @return string with the total replicated edits count and the date
461   * of the last edit that was applied
462   */
463  public String getStats() {
464    return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " +
465      "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
466      ", total replicated edits: " + this.totalReplicatedEdits;
467  }
468
469  /**
470   * Get replication Sink Metrics
471   * @return MetricsSink
472   */
473  public MetricsSink getSinkMetrics() {
474    return this.metrics;
475  }
476}