001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT; 021import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY; 022 023import java.util.ArrayDeque; 024import java.util.Iterator; 025import java.util.Queue; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 030import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; 036 037/* 038 This class provides the queue to save Wal events from backing RingBuffer. 039 */ 040@InterfaceAudience.Private 041public class WALEventTrackerQueueService implements NamedQueueService { 042 043 private EvictingQueue<WALEventTrackerPayload> queue; 044 private static final String WAL_EVENT_TRACKER_RING_BUFFER_SIZE = 045 "hbase.regionserver.wal.event.tracker.ringbuffer.size"; 046 private final boolean walEventTrackerEnabled; 047 private int queueSize; 048 private MetricsWALEventTrackerSource source = null; 049 050 private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerQueueService.class); 051 052 public WALEventTrackerQueueService(Configuration conf) { 053 this(conf, null); 054 } 055 056 public WALEventTrackerQueueService(Configuration conf, MetricsWALEventTrackerSource source) { 057 this.walEventTrackerEnabled = 058 conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT); 059 if (!walEventTrackerEnabled) { 060 return; 061 } 062 063 this.queueSize = conf.getInt(WAL_EVENT_TRACKER_RING_BUFFER_SIZE, 256); 064 queue = EvictingQueue.create(queueSize); 065 if (source == null) { 066 this.source = CompatibilitySingletonFactory.getInstance(MetricsWALEventTrackerSource.class); 067 } else { 068 this.source = source; 069 } 070 } 071 072 @Override 073 public NamedQueuePayload.NamedQueueEvent getEvent() { 074 return NamedQueuePayload.NamedQueueEvent.WAL_EVENT_TRACKER; 075 } 076 077 @Override 078 public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { 079 if (!walEventTrackerEnabled) { 080 return; 081 } 082 if (!(namedQueuePayload instanceof WALEventTrackerPayload)) { 083 LOG.warn("WALEventTrackerQueueService: NamedQueuePayload is not of type" 084 + " WALEventTrackerPayload."); 085 return; 086 } 087 088 WALEventTrackerPayload payload = (WALEventTrackerPayload) namedQueuePayload; 089 if (LOG.isDebugEnabled()) { 090 LOG.debug("Adding wal event tracker payload " + payload); 091 } 092 addToQueue(payload); 093 } 094 095 /* 096 * Made it default to use it in testing. 097 */ 098 synchronized void addToQueue(WALEventTrackerPayload payload) { 099 queue.add(payload); 100 } 101 102 @Override 103 public boolean clearNamedQueue() { 104 if (!walEventTrackerEnabled) { 105 return false; 106 } 107 LOG.debug("Clearing wal event tracker queue"); 108 queue.clear(); 109 return true; 110 } 111 112 @Override 113 public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 114 return null; 115 } 116 117 @Override 118 public void persistAll(Connection connection) { 119 if (!walEventTrackerEnabled) { 120 return; 121 } 122 if (queue.isEmpty()) { 123 LOG.debug("Wal Event tracker queue is empty."); 124 return; 125 } 126 127 Queue<WALEventTrackerPayload> queue = getWALEventTrackerList(); 128 try { 129 WALEventTrackerTableAccessor.addWalEventTrackerRows(queue, connection); 130 } catch (Exception ioe) { 131 // If we fail to persist the records with retries then just forget about them. 132 // This is a best effort service. 133 LOG.error("Failed while persisting wal tracker records", ioe); 134 // Increment metrics for failed puts 135 source.incrFailedPuts(queue.size()); 136 } 137 } 138 139 private synchronized Queue<WALEventTrackerPayload> getWALEventTrackerList() { 140 Queue<WALEventTrackerPayload> retQueue = new ArrayDeque<>(); 141 Iterator<WALEventTrackerPayload> iterator = queue.iterator(); 142 while (iterator.hasNext()) { 143 retQueue.add(iterator.next()); 144 } 145 queue.clear(); 146 return retQueue; 147 } 148}