001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Map.Entry;
029import java.util.TreeMap;
030import java.util.UUID;
031import java.util.concurrent.atomic.AtomicLong;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellScanner;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.Stoppable;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.TableNotFoundException;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.Delete;
049import org.apache.hadoop.hbase.client.Mutation;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
052import org.apache.hadoop.hbase.client.Row;
053import org.apache.hadoop.hbase.client.Table;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
058import org.apache.hadoop.hbase.wal.WALEdit;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.Pair;
061
062/**
063 * <p>
064 * This class is responsible for replicating the edits coming
065 * from another cluster.
066 * </p><p>
067 * This replication process is currently waiting for the edits to be applied
068 * before the method can return. This means that the replication of edits
069 * is synchronized (after reading from WALs in ReplicationSource) and that a
070 * single region server cannot receive edits from two sources at the same time
071 * </p><p>
072 * This class uses the native HBase client in order to replicate entries.
073 * </p>
074 *
075 * TODO make this class more like ReplicationSource wrt log handling
076 */
077@InterfaceAudience.Private
078public class ReplicationSink {
079
080  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class);
081  private final Configuration conf;
082  // Volatile because of note in here -- look for double-checked locking:
083  // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
084  private volatile Connection sharedHtableCon;
085  private final MetricsSink metrics;
086  private final AtomicLong totalReplicatedEdits = new AtomicLong();
087  private final Object sharedHtableConLock = new Object();
088  // Number of hfiles that we successfully replicated
089  private long hfilesReplicated = 0;
090  private SourceFSConfigurationProvider provider;
091  private WALEntrySinkFilter walEntrySinkFilter;
092
093  /**
094   * Create a sink for replication
095   *
096   * @param conf                conf object
097   * @param stopper             boolean to tell this thread to stop
098   * @throws IOException thrown when HDFS goes bad or bad file name
099   */
100  public ReplicationSink(Configuration conf, Stoppable stopper)
101      throws IOException {
102    this.conf = HBaseConfiguration.create(conf);
103    decorateConf();
104    this.metrics = new MetricsSink();
105    this.walEntrySinkFilter = setupWALEntrySinkFilter();
106    String className =
107        conf.get("hbase.replication.source.fs.conf.provider",
108          DefaultSourceFSConfigurationProvider.class.getCanonicalName());
109    try {
110      @SuppressWarnings("rawtypes")
111      Class c = Class.forName(className);
112      this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance();
113    } catch (Exception e) {
114      throw new IllegalArgumentException("Configured source fs configuration provider class "
115          + className + " throws error.", e);
116    }
117  }
118
119  private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException {
120    Class<?> walEntryFilterClass =
121        this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null);
122    WALEntrySinkFilter filter = null;
123    try {
124      filter = walEntryFilterClass == null? null:
125          (WALEntrySinkFilter)walEntryFilterClass.getDeclaredConstructor().newInstance();
126    } catch (Exception e) {
127      LOG.warn("Failed to instantiate " + walEntryFilterClass);
128    }
129    if (filter != null) {
130      filter.init(getConnection());
131    }
132    return filter;
133  }
134
135  /**
136   * decorate the Configuration object to make replication more receptive to delays:
137   * lessen the timeout and numTries.
138   */
139  private void decorateConf() {
140    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
141        this.conf.getInt("replication.sink.client.retries.number", 4));
142    this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
143        this.conf.getInt("replication.sink.client.ops.timeout", 10000));
144    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
145    if (StringUtils.isNotEmpty(replicationCodec)) {
146      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
147    }
148    // use server ZK cluster for replication, so we unset the client ZK related properties if any
149    if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
150      this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
151    }
152   }
153
154  /**
155   * Replicate this array of entries directly into the local cluster using the native client. Only
156   * operates against raw protobuf type saving on a conversion from pb to pojo.
157   * @param replicationClusterId Id which will uniquely identify source cluster FS client
158   *          configurations in the replication configuration directory
159   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
160   *          directory
161   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
162   * @throws IOException If failed to replicate the data
163   */
164  public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
165      String replicationClusterId, String sourceBaseNamespaceDirPath,
166      String sourceHFileArchiveDirPath) throws IOException {
167    if (entries.isEmpty()) return;
168    // Very simple optimization where we batch sequences of rows going
169    // to the same table.
170    try {
171      long totalReplicated = 0;
172      // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
173      // invocation of this method per table and cluster id.
174      Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
175
176      Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
177      for (WALEntry entry : entries) {
178        TableName table =
179            TableName.valueOf(entry.getKey().getTableName().toByteArray());
180        if (this.walEntrySinkFilter != null) {
181          if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
182            // Skip Cells in CellScanner associated with this entry.
183            int count = entry.getAssociatedCellCount();
184            for (int i = 0; i < count; i++) {
185              // Throw index out of bounds if our cell count is off
186              if (!cells.advance()) {
187                throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
188              }
189            }
190            continue;
191          }
192        }
193        Cell previousCell = null;
194        Mutation mutation = null;
195        int count = entry.getAssociatedCellCount();
196        for (int i = 0; i < count; i++) {
197          // Throw index out of bounds if our cell count is off
198          if (!cells.advance()) {
199            throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
200          }
201          Cell cell = cells.current();
202          // Handle bulk load hfiles replication
203          if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
204            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
205            if(bld.getReplicate()) {
206              if (bulkLoadsPerClusters == null) {
207                bulkLoadsPerClusters = new HashMap<>();
208              }
209              // Map of table name Vs list of pair of family and list of
210              // hfile paths from its namespace
211              Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
212                bulkLoadsPerClusters.get(bld.getClusterIdsList());
213              if (bulkLoadHFileMap == null) {
214                bulkLoadHFileMap = new HashMap<>();
215                bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
216              }
217              buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
218            }
219          } else {
220            // Handle wal replication
221            if (isNewRowOrType(previousCell, cell)) {
222              // Create new mutation
223              mutation =
224                  CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
225                      cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
226                      cell.getRowLength());
227              List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size());
228              for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
229                clusterIds.add(toUUID(clusterId));
230              }
231              mutation.setClusterIds(clusterIds);
232              addToHashMultiMap(rowMap, table, clusterIds, mutation);
233            }
234            if (CellUtil.isDelete(cell)) {
235              ((Delete) mutation).add(cell);
236            } else {
237              ((Put) mutation).add(cell);
238            }
239            previousCell = cell;
240          }
241        }
242        totalReplicated++;
243      }
244
245      // TODO Replicating mutations and bulk loaded data can be made parallel
246      if (!rowMap.isEmpty()) {
247        LOG.debug("Started replicating mutations.");
248        for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
249          batch(entry.getKey(), entry.getValue().values());
250        }
251        LOG.debug("Finished replicating mutations.");
252      }
253
254      if(bulkLoadsPerClusters != null) {
255        for (Entry<List<String>, Map<String, List<Pair<byte[],
256          List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
257          Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
258          if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
259            LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString());
260            Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId);
261            try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf,
262                sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
263                getConnection(), entry.getKey())) {
264              hFileReplicator.replicate();
265              LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString());
266            }
267          }
268        }
269      }
270
271      int size = entries.size();
272      this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
273      this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
274      this.totalReplicatedEdits.addAndGet(totalReplicated);
275    } catch (IOException ex) {
276      LOG.error("Unable to accept edit because:", ex);
277      throw ex;
278    }
279  }
280
281  private void buildBulkLoadHFileMap(
282      final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
283      BulkLoadDescriptor bld) throws IOException {
284    List<StoreDescriptor> storesList = bld.getStoresList();
285    int storesSize = storesList.size();
286    for (int j = 0; j < storesSize; j++) {
287      StoreDescriptor storeDescriptor = storesList.get(j);
288      List<String> storeFileList = storeDescriptor.getStoreFileList();
289      int storeFilesSize = storeFileList.size();
290      hfilesReplicated += storeFilesSize;
291      for (int k = 0; k < storeFilesSize; k++) {
292        byte[] family = storeDescriptor.getFamilyName().toByteArray();
293
294        // Build hfile relative path from its namespace
295        String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
296        String tableName = table.getNameWithNamespaceInclAsString();
297        List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
298        if (familyHFilePathsList != null) {
299          boolean foundFamily = false;
300          for (Pair<byte[], List<String>> familyHFilePathsPair :  familyHFilePathsList) {
301            if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
302              // Found family already present, just add the path to the existing list
303              familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
304              foundFamily = true;
305              break;
306            }
307          }
308          if (!foundFamily) {
309            // Family not found, add this family and its hfile paths pair to the list
310            addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
311          }
312        } else {
313          // Add this table entry into the map
314          addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
315        }
316      }
317    }
318  }
319
320  private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
321      List<Pair<byte[], List<String>>> familyHFilePathsList) {
322    List<String> hfilePaths = new ArrayList<>(1);
323    hfilePaths.add(pathToHfileFromNS);
324    familyHFilePathsList.add(new Pair<>(family, hfilePaths));
325  }
326
327  private void addNewTableEntryInMap(
328      final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
329      String pathToHfileFromNS, String tableName) {
330    List<String> hfilePaths = new ArrayList<>(1);
331    hfilePaths.add(pathToHfileFromNS);
332    Pair<byte[], List<String>> newFamilyHFilePathsPair = new Pair<>(family, hfilePaths);
333    List<Pair<byte[], List<String>>> newFamilyHFilePathsList = new ArrayList<>();
334    newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
335    bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
336  }
337
338  private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
339      byte[] family) {
340    return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
341        .append(table.getQualifierAsString()).append(Path.SEPARATOR)
342        .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
343        .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
344  }
345
346  /**
347   * @param previousCell
348   * @param cell
349   * @return True if we have crossed over onto a new row or type
350   */
351  private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
352    return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() ||
353        !CellUtil.matchingRows(previousCell, cell);
354  }
355
356  private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
357    return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
358  }
359
360  /**
361   * Simple helper to a map from key to (a list of) values
362   * TODO: Make a general utility method
363   * @param map
364   * @param key1
365   * @param key2
366   * @param value
367   * @return the list of values corresponding to key1 and key2
368   */
369  private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
370    Map<K2,List<V>> innerMap = map.get(key1);
371    if (innerMap == null) {
372      innerMap = new HashMap<>();
373      map.put(key1, innerMap);
374    }
375    List<V> values = innerMap.get(key2);
376    if (values == null) {
377      values = new ArrayList<>();
378      innerMap.put(key2, values);
379    }
380    values.add(value);
381    return values;
382  }
383
384  /**
385   * stop the thread pool executor. It is called when the regionserver is stopped.
386   */
387  public void stopReplicationSinkServices() {
388    try {
389      if (this.sharedHtableCon != null) {
390        synchronized (sharedHtableConLock) {
391          if (this.sharedHtableCon != null) {
392            this.sharedHtableCon.close();
393            this.sharedHtableCon = null;
394          }
395        }
396      }
397    } catch (IOException e) {
398      LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
399    }
400  }
401
402
403  /**
404   * Do the changes and handle the pool
405   * @param tableName table to insert into
406   * @param allRows list of actions
407   * @throws IOException
408   */
409  protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
410    if (allRows.isEmpty()) {
411      return;
412    }
413    Table table = null;
414    try {
415      Connection connection = getConnection();
416      table = connection.getTable(tableName);
417      for (List<Row> rows : allRows) {
418        table.batch(rows, null);
419      }
420    } catch (RetriesExhaustedWithDetailsException rewde) {
421      for (Throwable ex : rewde.getCauses()) {
422        if (ex instanceof TableNotFoundException) {
423          throw new TableNotFoundException("'" + tableName + "'");
424        }
425      }
426      throw rewde;
427    } catch (InterruptedException ix) {
428      throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
429    } finally {
430      if (table != null) {
431        table.close();
432      }
433    }
434  }
435
436  private Connection getConnection() throws IOException {
437    // See https://en.wikipedia.org/wiki/Double-checked_locking
438    Connection connection = sharedHtableCon;
439    if (connection == null) {
440      synchronized (sharedHtableConLock) {
441        connection = sharedHtableCon;
442        if (connection == null) {
443          connection = sharedHtableCon = ConnectionFactory.createConnection(conf);
444        }
445      }
446    }
447    return connection;
448  }
449
450  /**
451   * Get a string representation of this sink's metrics
452   * @return string with the total replicated edits count and the date
453   * of the last edit that was applied
454   */
455  public String getStats() {
456    return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " +
457      "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
458      ", total replicated edits: " + this.totalReplicatedEdits;
459  }
460
461  /**
462   * Get replication Sink Metrics
463   * @return MetricsSink
464   */
465  public MetricsSink getSinkMetrics() {
466    return this.metrics;
467  }
468}