001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_INFO_FAMILY;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Queue;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.Put;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.RetryCounter;
035import org.apache.hadoop.hbase.util.RetryCounterFactory;
036import org.apache.yetus.audience.InterfaceAudience;
037
038@InterfaceAudience.Private
039public final class WALEventTrackerTableAccessor {
040  public static final String RS_COLUMN = "region_server_name";
041  public static final String WAL_NAME_COLUMN = "wal_name";
042  public static final String TIMESTAMP_COLUMN = "timestamp";
043  public static final String WAL_STATE_COLUMN = "wal_state";
044  public static final String WAL_LENGTH_COLUMN = "wal_length";
045  public static final String MAX_ATTEMPTS_KEY = "wal.event.tracker.max.attempts";
046  public static final String SLEEP_INTERVAL_KEY = "wal.event.tracker.sleep.interval.msec";
047  public static final String MAX_SLEEP_TIME_KEY = "wal.event.tracker.max.sleep.time.msec";
048  public static final int DEFAULT_MAX_ATTEMPTS = 3;
049  public static final long DEFAULT_SLEEP_INTERVAL = 1000L; // 1 second
050  public static final long DEFAULT_MAX_SLEEP_TIME = 60000L; // 60 seconds
051  public static final String WAL_EVENT_TRACKER_TABLE_NAME_STR = "REPLICATION.WALEVENTTRACKER";
052  public static final String DELIMITER = "_";
053
054  private WALEventTrackerTableAccessor() {
055  }
056
057  /**
058   * {@link #WAL_EVENT_TRACKER_TABLE_NAME_STR} table name - can be enabled with config -
059   * hbase.regionserver.wal.event.tracker.enabled
060   */
061  public static final TableName WAL_EVENT_TRACKER_TABLE_NAME =
062    TableName.valueOf(WAL_EVENT_TRACKER_TABLE_NAME_STR);
063
064  private static void doPut(final Connection connection, final List<Put> puts) throws Exception {
065    RetryCounter retryCounter = getRetryFactory(connection.getConfiguration()).create();
066    while (true) {
067      try (Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME)) {
068        table.put(puts);
069        return;
070      } catch (IOException ioe) {
071        retryOrThrow(retryCounter, ioe);
072      }
073      retryCounter.sleepUntilNextRetry();
074    }
075  }
076
077  private static RetryCounterFactory getRetryFactory(Configuration conf) {
078    int maxAttempts = conf.getInt(MAX_ATTEMPTS_KEY, DEFAULT_MAX_ATTEMPTS);
079    long sleepIntervalMs = conf.getLong(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL);
080    long maxSleepTimeMs = conf.getLong(MAX_SLEEP_TIME_KEY, DEFAULT_MAX_SLEEP_TIME);
081    RetryCounter.RetryConfig retryConfig =
082      new RetryCounter.RetryConfig(maxAttempts, sleepIntervalMs, maxSleepTimeMs,
083        TimeUnit.MILLISECONDS, new RetryCounter.ExponentialBackoffPolicyWithLimit());
084    return new RetryCounterFactory(retryConfig);
085  }
086
087  private static void retryOrThrow(RetryCounter retryCounter, IOException ioe) throws IOException {
088    if (retryCounter.shouldRetry()) {
089      return;
090    }
091    throw ioe;
092  }
093
094  /**
095   * Add wal event tracker rows to hbase:waleventtracker table
096   * @param walEventPayloads List of walevents to process
097   * @param connection       Connection to use.
098   */
099  public static void addWalEventTrackerRows(Queue<WALEventTrackerPayload> walEventPayloads,
100    final Connection connection) throws Exception {
101    List<Put> puts = new ArrayList<>(walEventPayloads.size());
102    for (WALEventTrackerPayload payload : walEventPayloads) {
103      final byte[] rowKey = getRowKey(payload);
104      final Put put = new Put(rowKey);
105      // TODO Do we need to SKIP_WAL ?
106      put.setPriority(HConstants.NORMAL_QOS);
107      put
108        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(RS_COLUMN),
109          Bytes.toBytes(payload.getRsName()))
110        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_NAME_COLUMN),
111          Bytes.toBytes(payload.getWalName()))
112        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(TIMESTAMP_COLUMN),
113          Bytes.toBytes(payload.getTimeStamp()))
114        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_STATE_COLUMN),
115          Bytes.toBytes(payload.getState()))
116        .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_LENGTH_COLUMN),
117          Bytes.toBytes(payload.getWalLength()));
118      puts.add(put);
119    }
120    doPut(connection, puts);
121  }
122
123  /**
124   * Create rowKey: 1. We want RS name to be the leading part of rowkey so that we can query by RS
125   * name filter. WAL name contains rs name as a leading part. 2. Timestamp when the event was
126   * generated. 3. Add state of the wal. Combination of 1 + 2 + 3 is definitely going to create a
127   * unique rowkey.
128   * @param payload payload to process
129   * @return rowKey byte[]
130   */
131  public static byte[] getRowKey(final WALEventTrackerPayload payload) {
132    String walName = payload.getWalName();
133    // converting to string since this will help seeing the timestamp in string format using
134    // hbase shell commands.
135    String timestampStr = String.valueOf(payload.getTimeStamp());
136    String walState = payload.getState();
137    final String rowKeyStr = walName + DELIMITER + timestampStr + DELIMITER + walState;
138    return Bytes.toBytes(rowKeyStr);
139  }
140}