001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_INFO_FAMILY; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Queue; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.Put; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.RetryCounter; 035import org.apache.hadoop.hbase.util.RetryCounterFactory; 036import org.apache.yetus.audience.InterfaceAudience; 037 038@InterfaceAudience.Private 039public final class WALEventTrackerTableAccessor { 040 public static final String RS_COLUMN = "region_server_name"; 041 public static final String WAL_NAME_COLUMN = "wal_name"; 042 public static final String TIMESTAMP_COLUMN = "timestamp"; 043 public static final String WAL_STATE_COLUMN = "wal_state"; 044 public static final String WAL_LENGTH_COLUMN = "wal_length"; 045 public static final String MAX_ATTEMPTS_KEY = "wal.event.tracker.max.attempts"; 046 public static final String SLEEP_INTERVAL_KEY = "wal.event.tracker.sleep.interval.msec"; 047 public static final String MAX_SLEEP_TIME_KEY = "wal.event.tracker.max.sleep.time.msec"; 048 public static final int DEFAULT_MAX_ATTEMPTS = 3; 049 public static final long DEFAULT_SLEEP_INTERVAL = 1000L; // 1 second 050 public static final long DEFAULT_MAX_SLEEP_TIME = 60000L; // 60 seconds 051 public static final String WAL_EVENT_TRACKER_TABLE_NAME_STR = "REPLICATION.WALEVENTTRACKER"; 052 public static final String DELIMITER = "_"; 053 054 private WALEventTrackerTableAccessor() { 055 } 056 057 /** 058 * {@link #WAL_EVENT_TRACKER_TABLE_NAME_STR} table name - can be enabled with config - 059 * hbase.regionserver.wal.event.tracker.enabled 060 */ 061 public static final TableName WAL_EVENT_TRACKER_TABLE_NAME = 062 TableName.valueOf(WAL_EVENT_TRACKER_TABLE_NAME_STR); 063 064 private static void doPut(final Connection connection, final List<Put> puts) throws Exception { 065 RetryCounter retryCounter = getRetryFactory(connection.getConfiguration()).create(); 066 while (true) { 067 try (Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME)) { 068 table.put(puts); 069 return; 070 } catch (IOException ioe) { 071 retryOrThrow(retryCounter, ioe); 072 } 073 retryCounter.sleepUntilNextRetry(); 074 } 075 } 076 077 private static RetryCounterFactory getRetryFactory(Configuration conf) { 078 int maxAttempts = conf.getInt(MAX_ATTEMPTS_KEY, DEFAULT_MAX_ATTEMPTS); 079 long sleepIntervalMs = conf.getLong(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL); 080 long maxSleepTimeMs = conf.getLong(MAX_SLEEP_TIME_KEY, DEFAULT_MAX_SLEEP_TIME); 081 RetryCounter.RetryConfig retryConfig = 082 new RetryCounter.RetryConfig(maxAttempts, sleepIntervalMs, maxSleepTimeMs, 083 TimeUnit.MILLISECONDS, new RetryCounter.ExponentialBackoffPolicyWithLimit()); 084 return new RetryCounterFactory(retryConfig); 085 } 086 087 private static void retryOrThrow(RetryCounter retryCounter, IOException ioe) throws IOException { 088 if (retryCounter.shouldRetry()) { 089 return; 090 } 091 throw ioe; 092 } 093 094 /** 095 * Add wal event tracker rows to hbase:waleventtracker table 096 * @param walEventPayloads List of walevents to process 097 * @param connection Connection to use. 098 */ 099 public static void addWalEventTrackerRows(Queue<WALEventTrackerPayload> walEventPayloads, 100 final Connection connection) throws Exception { 101 List<Put> puts = new ArrayList<>(walEventPayloads.size()); 102 for (WALEventTrackerPayload payload : walEventPayloads) { 103 final byte[] rowKey = getRowKey(payload); 104 final Put put = new Put(rowKey); 105 // TODO Do we need to SKIP_WAL ? 106 put.setPriority(HConstants.NORMAL_QOS); 107 put 108 .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(RS_COLUMN), 109 Bytes.toBytes(payload.getRsName())) 110 .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_NAME_COLUMN), 111 Bytes.toBytes(payload.getWalName())) 112 .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(TIMESTAMP_COLUMN), 113 Bytes.toBytes(payload.getTimeStamp())) 114 .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_STATE_COLUMN), 115 Bytes.toBytes(payload.getState())) 116 .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_LENGTH_COLUMN), 117 Bytes.toBytes(payload.getWalLength())); 118 puts.add(put); 119 } 120 doPut(connection, puts); 121 } 122 123 /** 124 * Create rowKey: 1. We want RS name to be the leading part of rowkey so that we can query by RS 125 * name filter. WAL name contains rs name as a leading part. 2. Timestamp when the event was 126 * generated. 3. Add state of the wal. Combination of 1 + 2 + 3 is definitely going to create a 127 * unique rowkey. 128 * @param payload payload to process 129 * @return rowKey byte[] 130 */ 131 public static byte[] getRowKey(final WALEventTrackerPayload payload) { 132 String walName = payload.getWalName(); 133 // converting to string since this will help seeing the timestamp in string format using 134 // hbase shell commands. 135 String timestampStr = String.valueOf(payload.getTimeStamp()); 136 String walState = payload.getState(); 137 final String rowKeyStr = walName + DELIMITER + timestampStr + DELIMITER + walState; 138 return Bytes.toBytes(rowKeyStr); 139 } 140}