001/*
002*
003* Licensed to the Apache Software Foundation (ASF) under one
004* or more contributor license agreements.  See the NOTICE file
005* distributed with this work for additional information
006* regarding copyright ownership.  The ASF licenses this file
007* to you under the Apache License, Version 2.0 (the
008* "License"); you may not use this file except in compliance
009* with the License.  You may obtain a copy of the License at
010*
011*     http://www.apache.org/licenses/LICENSE-2.0
012*
013* Unless required by applicable law or agreed to in writing, software
014* distributed under the License is distributed on an "AS IS" BASIS,
015* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016* See the License for the specific language governing permissions and
017* limitations under the License.
018*/
019package org.apache.hadoop.hbase.replication;
020
021import org.apache.hadoop.hbase.CompareOperator;
022import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.Abortable;
025import org.apache.hadoop.hbase.HColumnDescriptor;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.HTableDescriptor;
028import org.apache.hadoop.hbase.NamespaceDescriptor;
029import org.apache.hadoop.hbase.TableExistsException;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.ResultScanner;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
041import org.apache.hadoop.hbase.regionserver.BloomType;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.RetryCounter;
044import org.apache.hadoop.hbase.util.RetryCounterFactory;
045
046import java.io.IOException;
047import java.io.InterruptedIOException;
048import java.util.ArrayList;
049import java.util.Arrays;
050import java.util.HashSet;
051import java.util.List;
052import java.util.Map;
053import java.util.Set;
054import java.util.concurrent.CountDownLatch;
055import java.util.concurrent.Executor;
056import java.util.concurrent.LinkedBlockingQueue;
057import java.util.concurrent.ThreadPoolExecutor;
058import java.util.concurrent.TimeUnit;
059
060/*
061 * Abstract class that provides an interface to the Replication Table. Which is currently
062 * being used for WAL offset tracking.
063 * The basic schema of this table will store each individual queue as a
064 * seperate row. The row key will be a unique identifier of the creating server's name and the
065 * queueId. Each queue must have the following two columns:
066 *  COL_QUEUE_OWNER: tracks which server is currently responsible for tracking the queue
067 *  COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this
068 *    queue. The most recent previous owner is the leftmost entry.
069 * They will also have columns mapping [WAL filename : offset]
070 * The most flexible method of interacting with the Replication Table is by calling
071 * getOrBlockOnReplicationTable() which will return a new copy of the Replication Table. It is up
072 * to the caller to close the returned table.
073 */
074@InterfaceAudience.Private
075abstract class ReplicationTableBase {
076
077  /** Name of the HBase Table used for tracking replication*/
078  public static final TableName REPLICATION_TABLE_NAME =
079    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
080
081  // Column family and column names for Queues in the Replication Table
082  public static final byte[] CF_QUEUE = Bytes.toBytes("q");
083  public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o");
084  public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h");
085
086  // Column Descriptor for the Replication Table
087  private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR =
088    new HColumnDescriptor(CF_QUEUE).setMaxVersions(1)
089      .setInMemory(true)
090      .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
091        // TODO: Figure out which bloom filter to use
092      .setBloomFilterType(BloomType.NONE);
093
094  // The value used to delimit the queueId and server name inside of a queue's row key. Currently a
095  // hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens.
096  // See HBASE-11394.
097  public static final String ROW_KEY_DELIMITER = "-";
098
099  // The value used to delimit server names in the queue history list
100  public static final String QUEUE_HISTORY_DELIMITER = "|";
101
102  /*
103  * Make sure that HBase table operations for replication have a high number of retries. This is
104  * because the server is aborted if any HBase table operation fails. Each RPC will be attempted
105  * 3600 times before exiting. This provides each operation with 2 hours of retries
106  * before the server is aborted.
107  */
108  private static final int CLIENT_RETRIES = 3600;
109  private static final int RPC_TIMEOUT = 2000;
110  private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
111
112  // We only need a single thread to initialize the Replication Table
113  private static final int NUM_INITIALIZE_WORKERS = 1;
114
115  protected final Configuration conf;
116  protected final Abortable abortable;
117  private final Connection connection;
118  private final Executor executor;
119  private volatile CountDownLatch replicationTableInitialized;
120
121  public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException {
122    this.conf = new Configuration(conf);
123    this.abortable = abort;
124    decorateConf();
125    this.connection = ConnectionFactory.createConnection(this.conf);
126    this.executor = setUpExecutor();
127    this.replicationTableInitialized = new CountDownLatch(1);
128    createReplicationTableInBackground();
129  }
130
131  /**
132   * Modify the connection's config so that operations run on the Replication Table have longer and
133   * a larger number of retries
134   */
135  private void decorateConf() {
136    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
137  }
138
139  /**
140   * Sets up the thread pool executor used to build the Replication Table in the background
141   * @return the configured executor
142   */
143  private Executor setUpExecutor() {
144    ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(NUM_INITIALIZE_WORKERS,
145        NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
146    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
147    tfb.setNameFormat("ReplicationTableExecutor-%d");
148    tfb.setDaemon(true);
149    tempExecutor.setThreadFactory(tfb.build());
150    return tempExecutor;
151  }
152
153  /**
154   * Get whether the Replication Table has been successfully initialized yet
155   * @return whether the Replication Table is initialized
156   */
157  public boolean getInitializationStatus() {
158    return replicationTableInitialized.getCount() == 0;
159  }
160
161  /**
162   * Increases the RPC and operations timeouts for the Replication Table
163   */
164  private Table setReplicationTableTimeOuts(Table replicationTable) {
165    replicationTable.setRpcTimeout(RPC_TIMEOUT);
166    replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
167    return replicationTable;
168  }
169
170  /**
171   * Build the row key for the given queueId. This will uniquely identify it from all other queues
172   * in the cluster.
173   * @param serverName The owner of the queue
174   * @param queueId String identifier of the queue
175   * @return String representation of the queue's row key
176   */
177  protected String buildQueueRowKey(String serverName, String queueId) {
178    return queueId + ROW_KEY_DELIMITER + serverName;
179  }
180
181  /**
182   * Parse the original queueId from a row key
183   * @param rowKey String representation of a queue's row key
184   * @return the original queueId
185   */
186  protected String getRawQueueIdFromRowKey(String rowKey) {
187    return rowKey.split(ROW_KEY_DELIMITER)[0];
188  }
189
190  /**
191   * Returns a queue's row key given either its raw or reclaimed queueId
192   *
193   * @param queueId queueId of the queue
194   * @return byte representation of the queue's row key
195   */
196  protected byte[] queueIdToRowKey(String serverName, String queueId) {
197    // Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen
198    // then this is not a reclaimed queue.
199    if (!queueId.contains(ROW_KEY_DELIMITER)) {
200      return Bytes.toBytes(buildQueueRowKey(serverName, queueId));
201      // If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the
202      // queue's row key
203    } else {
204      return Bytes.toBytes(queueId);
205    }
206  }
207
208  /**
209   * Creates a "|" delimited record of the queue's past region server owners.
210   *
211   * @param originalHistory the queue's original owner history
212   * @param oldServer the name of the server that used to own the queue
213   * @return the queue's new owner history
214   */
215  protected String buildClaimedQueueHistory(String originalHistory, String oldServer) {
216    return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory;
217  }
218
219  /**
220   * Get a list of all region servers that have outstanding replication queues. These servers could
221   * be alive, dead or from a previous run of the cluster.
222   * @return a list of server names
223   */
224  protected List<String> getListOfReplicators() {
225    // scan all of the queues and return a list of all unique OWNER values
226    Set<String> peerServers = new HashSet<>();
227    ResultScanner allQueuesInCluster = null;
228    try (Table replicationTable = getOrBlockOnReplicationTable()){
229      Scan scan = new Scan();
230      scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
231      allQueuesInCluster = replicationTable.getScanner(scan);
232      for (Result queue : allQueuesInCluster) {
233        peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER)));
234      }
235    } catch (IOException e) {
236      String errMsg = "Failed getting list of replicators";
237      abortable.abort(errMsg, e);
238    } finally {
239      if (allQueuesInCluster != null) {
240        allQueuesInCluster.close();
241      }
242    }
243    return new ArrayList<>(peerServers);
244  }
245
246  protected List<String> getAllQueues(String serverName) {
247    List<String> allQueues = new ArrayList<>();
248    ResultScanner queueScanner = null;
249    try {
250      queueScanner = getQueuesBelongingToServer(serverName);
251      for (Result queue : queueScanner) {
252        String rowKey =  Bytes.toString(queue.getRow());
253        // If the queue does not have a Owner History, then we must be its original owner. So we
254        // want to return its queueId in raw form
255        if (Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)).length() == 0) {
256          allQueues.add(getRawQueueIdFromRowKey(rowKey));
257        } else {
258          allQueues.add(rowKey);
259        }
260      }
261      return allQueues;
262    } catch (IOException e) {
263      String errMsg = "Failed getting list of all replication queues for serverName=" + serverName;
264      abortable.abort(errMsg, e);
265      return null;
266    } finally {
267      if (queueScanner != null) {
268        queueScanner.close();
269      }
270    }
271  }
272
273  protected List<String> getLogsInQueue(String serverName, String queueId) {
274    String rowKey = queueId;
275    if (!queueId.contains(ROW_KEY_DELIMITER)) {
276      rowKey = buildQueueRowKey(serverName, queueId);
277    }
278    return getLogsInQueue(Bytes.toBytes(rowKey));
279  }
280
281  protected List<String> getLogsInQueue(byte[] rowKey) {
282    String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey);
283    try (Table replicationTable = getOrBlockOnReplicationTable()) {
284      Get getQueue = new Get(rowKey);
285      Result queue = replicationTable.get(getQueue);
286      if (queue == null || queue.isEmpty()) {
287        abortable.abort(errMsg, new ReplicationException(errMsg));
288        return null;
289      }
290      return readWALsFromResult(queue);
291    } catch (IOException e) {
292      abortable.abort(errMsg, e);
293      return null;
294    }
295  }
296
297  /**
298   * Read all of the WAL's from a queue into a list
299   *
300   * @param queue HBase query result containing the queue
301   * @return a list of all the WAL filenames
302   */
303  protected List<String> readWALsFromResult(Result queue) {
304    List<String> wals = new ArrayList<>();
305    Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
306    for (byte[] cQualifier : familyMap.keySet()) {
307      // Ignore the meta data fields of the queue
308      if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
309          COL_QUEUE_OWNER_HISTORY)) {
310        continue;
311      }
312      wals.add(Bytes.toString(cQualifier));
313    }
314    return wals;
315  }
316
317  /**
318   * Get the queue id's and meta data (Owner and History) for the queues belonging to the named
319   * server
320   *
321   * @param server name of the server
322   * @return a ResultScanner over the QueueIds belonging to the server
323   * @throws IOException
324   */
325  protected ResultScanner getQueuesBelongingToServer(String server) throws IOException {
326    Scan scan = new Scan();
327    SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
328    CompareOperator.EQUAL, Bytes.toBytes(server));
329    scan.setFilter(filterMyQueues);
330    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
331    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
332    try (Table replicationTable = getOrBlockOnReplicationTable()) {
333      ResultScanner results = replicationTable.getScanner(scan);
334      return results;
335    }
336  }
337
338  /**
339   * Attempts to acquire the Replication Table. This operation will block until it is assigned by
340   * the CreateReplicationWorker thread. It is up to the caller of this method to close the
341   * returned Table
342   * @return the Replication Table when it is created
343   * @throws IOException
344   */
345  protected Table getOrBlockOnReplicationTable() throws IOException {
346    // Sleep until the Replication Table becomes available
347    try {
348      replicationTableInitialized.await();
349    } catch (InterruptedException e) {
350      String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " +
351          e.getMessage();
352      throw new InterruptedIOException(errMsg);
353    }
354    return getAndSetUpReplicationTable();
355  }
356
357  /**
358   * Creates a new copy of the Replication Table and sets up the proper Table time outs for it
359   *
360   * @return the Replication Table
361   * @throws IOException
362   */
363  private Table getAndSetUpReplicationTable() throws IOException {
364    Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME);
365    setReplicationTableTimeOuts(replicationTable);
366    return replicationTable;
367  }
368
369  /**
370   * Builds the Replication Table in a background thread. Any method accessing the Replication Table
371   * should do so through getOrBlockOnReplicationTable()
372   *
373   * @return the Replication Table
374   * @throws IOException if the Replication Table takes too long to build
375   */
376  private void createReplicationTableInBackground() throws IOException {
377    executor.execute(new CreateReplicationTableWorker());
378  }
379
380  /**
381   * Attempts to build the Replication Table. Will continue blocking until we have a valid
382   * Table for the Replication Table.
383   */
384  private class CreateReplicationTableWorker implements Runnable {
385
386    private Admin admin;
387
388    @Override
389    public void run() {
390      try {
391        admin = connection.getAdmin();
392        if (!replicationTableExists()) {
393          createReplicationTable();
394        }
395        int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number",
396            CLIENT_RETRIES);
397        RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT);
398        RetryCounter retryCounter = counterFactory.create();
399        while (!replicationTableExists()) {
400          retryCounter.sleepUntilNextRetry();
401          if (!retryCounter.shouldRetry()) {
402            throw new IOException("Unable to acquire the Replication Table");
403          }
404        }
405        replicationTableInitialized.countDown();
406      } catch (IOException | InterruptedException e) {
407        abortable.abort("Failed building Replication Table", e);
408      }
409    }
410
411    /**
412     * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR
413     * in TableBasedReplicationQueuesImpl
414     *
415     * @throws IOException
416     */
417    private void createReplicationTable() throws IOException {
418      HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME);
419      replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
420      try {
421        admin.createTable(replicationTableDescriptor);
422      } catch (TableExistsException e) {
423        // In this case we can just continue as normal
424      }
425    }
426
427    /**
428     * Checks whether the Replication Table exists yet
429     *
430     * @return whether the Replication Table exists
431     * @throws IOException
432     */
433    private boolean replicationTableExists() {
434      try {
435        return admin.tableExists(REPLICATION_TABLE_NAME);
436      } catch (IOException e) {
437        return false;
438      }
439    }
440  }
441}