001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_INFO_FAMILY;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Queue;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.Put;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.RetryCounter;
035import org.apache.hadoop.hbase.util.RetryCounterFactory;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040@InterfaceAudience.Private
041public final class WALEventTrackerTableAccessor {
042  private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerTableAccessor.class);
043
044  public static final String RS_COLUMN = "region_server_name";
045  public static final String WAL_NAME_COLUMN = "wal_name";
046  public static final String TIMESTAMP_COLUMN = "timestamp";
047  public static final String WAL_STATE_COLUMN = "wal_state";
048  public static final String WAL_LENGTH_COLUMN = "wal_length";
049  public static final String MAX_ATTEMPTS_KEY = "wal.event.tracker.max.attempts";
050  public static final String SLEEP_INTERVAL_KEY = "wal.event.tracker.sleep.interval.msec";
051  public static final String MAX_SLEEP_TIME_KEY = "wal.event.tracker.max.sleep.time.msec";
052  public static final int DEFAULT_MAX_ATTEMPTS = 3;
053  public static final long DEFAULT_SLEEP_INTERVAL = 1000L; // 1 second
054  public static final long DEFAULT_MAX_SLEEP_TIME = 60000L; // 60 seconds
055  public static final String WAL_EVENT_TRACKER_TABLE_NAME_STR = "REPLICATION.WALEVENTTRACKER";
056  public static final String DELIMITER = "_";
057
058  private WALEventTrackerTableAccessor() {
059  }
060
061  /**
062   * {@link #WAL_EVENT_TRACKER_TABLE_NAME_STR} table name - can be enabled with config -
063   * hbase.regionserver.wal.event.tracker.enabled
064   */
065  public static final TableName WAL_EVENT_TRACKER_TABLE_NAME =
066    TableName.valueOf(WAL_EVENT_TRACKER_TABLE_NAME_STR);
067
068  private static void doPut(final Connection connection, final List<Put> puts) throws Exception {
069    RetryCounter retryCounter = getRetryFactory(connection.getConfiguration()).create();
070    while (true) {
071      try (Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME)) {
072        table.put(puts);
073        return;
074      } catch (IOException ioe) {
075        retryOrThrow(retryCounter, ioe);
076      }
077      retryCounter.sleepUntilNextRetry();
078    }
079  }
080
081  private static RetryCounterFactory getRetryFactory(Configuration conf) {
082    int maxAttempts = conf.getInt(MAX_ATTEMPTS_KEY, DEFAULT_MAX_ATTEMPTS);
083    long sleepIntervalMs = conf.getLong(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL);
084    long maxSleepTimeMs = conf.getLong(MAX_SLEEP_TIME_KEY, DEFAULT_MAX_SLEEP_TIME);
085    RetryCounter.RetryConfig retryConfig =
086      new RetryCounter.RetryConfig(maxAttempts, sleepIntervalMs, maxSleepTimeMs,
087        TimeUnit.MILLISECONDS, new RetryCounter.ExponentialBackoffPolicyWithLimit());
088    return new RetryCounterFactory(retryConfig);
089  }
090
091  private static void retryOrThrow(RetryCounter retryCounter, IOException ioe) throws IOException {
092    if (retryCounter.shouldRetry()) {
093      return;
094    }
095    throw ioe;
096  }
097
098  /**
099   * Add wal event tracker rows to hbase:waleventtracker table
100   * @param walEventPayloads List of walevents to process
101   * @param connection       Connection to use.
102   */
103  public static void addWalEventTrackerRows(Queue<WALEventTrackerPayload> walEventPayloads,
104    final Connection connection) throws Exception {
105    List<Put> puts = new ArrayList<>(walEventPayloads.size());
106    for (WALEventTrackerPayload payload : walEventPayloads) {
107      final byte[] rowKey = getRowKey(payload);
108      final Put put = new Put(rowKey);
109      // TODO Do we need to SKIP_WAL ?
110      put.setPriority(HConstants.NORMAL_QOS);
111      put
112        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(RS_COLUMN),
113          Bytes.toBytes(payload.getRsName()))
114        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_NAME_COLUMN),
115          Bytes.toBytes(payload.getWalName()))
116        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(TIMESTAMP_COLUMN),
117          Bytes.toBytes(payload.getTimeStamp()))
118        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_STATE_COLUMN),
119          Bytes.toBytes(payload.getState()))
120        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_LENGTH_COLUMN),
121          Bytes.toBytes(payload.getWalLength()));
122      puts.add(put);
123    }
124    doPut(connection, puts);
125  }
126
127  /**
128   * Create rowKey: 1. We want RS name to be the leading part of rowkey so that we can query by RS
129   * name filter. WAL name contains rs name as a leading part. 2. Timestamp when the event was
130   * generated. 3. Add state of the wal. Combination of 1 + 2 + 3 is definitely going to create a
131   * unique rowkey.
132   * @param payload payload to process
133   * @return rowKey byte[]
134   */
135  public static byte[] getRowKey(final WALEventTrackerPayload payload) {
136    String walName = payload.getWalName();
137    // converting to string since this will help seeing the timestamp in string format using
138    // hbase shell commands.
139    String timestampStr = String.valueOf(payload.getTimeStamp());
140    String walState = payload.getState();
141    final String rowKeyStr = walName + DELIMITER + timestampStr + DELIMITER + walState;
142    return Bytes.toBytes(rowKeyStr);
143  }
144}