001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_INFO_FAMILY; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Queue; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.Put; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.RetryCounter; 035import org.apache.hadoop.hbase.util.RetryCounterFactory; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040@InterfaceAudience.Private 041public final class WALEventTrackerTableAccessor { 042 private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerTableAccessor.class); 043 044 public static final String RS_COLUMN = "region_server_name"; 045 public static final String WAL_NAME_COLUMN = "wal_name"; 046 public static final String TIMESTAMP_COLUMN = "timestamp"; 047 public static final String WAL_STATE_COLUMN = "wal_state"; 048 public static final String WAL_LENGTH_COLUMN = "wal_length"; 049 public static final String MAX_ATTEMPTS_KEY = "wal.event.tracker.max.attempts"; 050 public static final String SLEEP_INTERVAL_KEY = "wal.event.tracker.sleep.interval.msec"; 051 public static final String MAX_SLEEP_TIME_KEY = "wal.event.tracker.max.sleep.time.msec"; 052 public static final int DEFAULT_MAX_ATTEMPTS = 3; 053 public static final long DEFAULT_SLEEP_INTERVAL = 1000L; // 1 second 054 public static final long DEFAULT_MAX_SLEEP_TIME = 60000L; // 60 seconds 055 public static final String WAL_EVENT_TRACKER_TABLE_NAME_STR = "REPLICATION.WALEVENTTRACKER"; 056 public static final String DELIMITER = "_"; 057 058 private WALEventTrackerTableAccessor() { 059 } 060 061 /** 062 * {@link #WAL_EVENT_TRACKER_TABLE_NAME_STR} table name - can be enabled with config - 063 * hbase.regionserver.wal.event.tracker.enabled 064 */ 065 public static final TableName WAL_EVENT_TRACKER_TABLE_NAME = 066 TableName.valueOf(WAL_EVENT_TRACKER_TABLE_NAME_STR); 067 068 private static void doPut(final Connection connection, final List<Put> puts) throws Exception { 069 RetryCounter retryCounter = getRetryFactory(connection.getConfiguration()).create(); 070 while (true) { 071 try (Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME)) { 072 table.put(puts); 073 return; 074 } catch (IOException ioe) { 075 retryOrThrow(retryCounter, ioe); 076 } 077 retryCounter.sleepUntilNextRetry(); 078 } 079 } 080 081 private static RetryCounterFactory getRetryFactory(Configuration conf) { 082 int maxAttempts = conf.getInt(MAX_ATTEMPTS_KEY, DEFAULT_MAX_ATTEMPTS); 083 long sleepIntervalMs = conf.getLong(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL); 084 long maxSleepTimeMs = conf.getLong(MAX_SLEEP_TIME_KEY, DEFAULT_MAX_SLEEP_TIME); 085 RetryCounter.RetryConfig retryConfig = 086 new RetryCounter.RetryConfig(maxAttempts, sleepIntervalMs, maxSleepTimeMs, 087 TimeUnit.MILLISECONDS, new RetryCounter.ExponentialBackoffPolicyWithLimit()); 088 return new RetryCounterFactory(retryConfig); 089 } 090 091 private static void retryOrThrow(RetryCounter retryCounter, IOException ioe) throws IOException { 092 if (retryCounter.shouldRetry()) { 093 return; 094 } 095 throw ioe; 096 } 097 098 /** 099 * Add wal event tracker rows to hbase:waleventtracker table 100 * @param walEventPayloads List of walevents to process 101 * @param connection Connection to use. 102 */ 103 public static void addWalEventTrackerRows(Queue<WALEventTrackerPayload> walEventPayloads, 104 final Connection connection) throws Exception { 105 List<Put> puts = new ArrayList<>(walEventPayloads.size()); 106 for (WALEventTrackerPayload payload : walEventPayloads) { 107 final byte[] rowKey = getRowKey(payload); 108 final Put put = new Put(rowKey); 109 // TODO Do we need to SKIP_WAL ? 110 put.setPriority(HConstants.NORMAL_QOS); 111 put 112 .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(RS_COLUMN), 113 Bytes.toBytes(payload.getRsName())) 114 .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_NAME_COLUMN), 115 Bytes.toBytes(payload.getWalName())) 116 .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(TIMESTAMP_COLUMN), 117 Bytes.toBytes(payload.getTimeStamp())) 118 .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_STATE_COLUMN), 119 Bytes.toBytes(payload.getState())) 120 .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_LENGTH_COLUMN), 121 Bytes.toBytes(payload.getWalLength())); 122 puts.add(put); 123 } 124 doPut(connection, puts); 125 } 126 127 /** 128 * Create rowKey: 1. We want RS name to be the leading part of rowkey so that we can query by RS 129 * name filter. WAL name contains rs name as a leading part. 2. Timestamp when the event was 130 * generated. 3. Add state of the wal. Combination of 1 + 2 + 3 is definitely going to create a 131 * unique rowkey. 132 * @param payload payload to process 133 * @return rowKey byte[] 134 */ 135 public static byte[] getRowKey(final WALEventTrackerPayload payload) { 136 String walName = payload.getWalName(); 137 // converting to string since this will help seeing the timestamp in string format using 138 // hbase shell commands. 139 String timestampStr = String.valueOf(payload.getTimeStamp()); 140 String walState = payload.getState(); 141 final String rowKeyStr = walName + DELIMITER + timestampStr + DELIMITER + walState; 142 return Bytes.toBytes(rowKeyStr); 143 } 144}