001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
021import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
022
023import java.util.ArrayDeque;
024import java.util.Iterator;
025import java.util.Queue;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
030import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
036
037/*
038  This class provides the queue to save Wal events from backing RingBuffer.
039 */
040@InterfaceAudience.Private
041public class WALEventTrackerQueueService implements NamedQueueService {
042
043  private EvictingQueue<WALEventTrackerPayload> queue;
044  private static final String WAL_EVENT_TRACKER_RING_BUFFER_SIZE =
045    "hbase.regionserver.wal.event.tracker.ringbuffer.size";
046  private final boolean walEventTrackerEnabled;
047  private int queueSize;
048  private MetricsWALEventTrackerSource source = null;
049
050  private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerQueueService.class);
051
052  public WALEventTrackerQueueService(Configuration conf) {
053    this(conf, null);
054  }
055
056  public WALEventTrackerQueueService(Configuration conf, MetricsWALEventTrackerSource source) {
057    this.walEventTrackerEnabled =
058      conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
059    if (!walEventTrackerEnabled) {
060      return;
061    }
062
063    this.queueSize = conf.getInt(WAL_EVENT_TRACKER_RING_BUFFER_SIZE, 256);
064    queue = EvictingQueue.create(queueSize);
065    if (source == null) {
066      this.source = CompatibilitySingletonFactory.getInstance(MetricsWALEventTrackerSource.class);
067    } else {
068      this.source = source;
069    }
070  }
071
072  @Override
073  public NamedQueuePayload.NamedQueueEvent getEvent() {
074    return NamedQueuePayload.NamedQueueEvent.WAL_EVENT_TRACKER;
075  }
076
077  @Override
078  public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
079    if (!walEventTrackerEnabled) {
080      return;
081    }
082    if (!(namedQueuePayload instanceof WALEventTrackerPayload)) {
083      LOG.warn("WALEventTrackerQueueService: NamedQueuePayload is not of type"
084        + " WALEventTrackerPayload.");
085      return;
086    }
087
088    WALEventTrackerPayload payload = (WALEventTrackerPayload) namedQueuePayload;
089    if (LOG.isDebugEnabled()) {
090      LOG.debug("Adding wal event tracker payload " + payload);
091    }
092    addToQueue(payload);
093  }
094
095  /*
096   * Made it default to use it in testing.
097   */
098  synchronized void addToQueue(WALEventTrackerPayload payload) {
099    queue.add(payload);
100  }
101
102  @Override
103  public boolean clearNamedQueue() {
104    if (!walEventTrackerEnabled) {
105      return false;
106    }
107    LOG.debug("Clearing wal event tracker queue");
108    queue.clear();
109    return true;
110  }
111
112  @Override
113  public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
114    return null;
115  }
116
117  @Override
118  public void persistAll(Connection connection) {
119    if (!walEventTrackerEnabled) {
120      return;
121    }
122    if (queue.isEmpty()) {
123      LOG.debug("Wal Event tracker queue is empty.");
124      return;
125    }
126
127    Queue<WALEventTrackerPayload> queue = getWALEventTrackerList();
128    try {
129      WALEventTrackerTableAccessor.addWalEventTrackerRows(queue, connection);
130    } catch (Exception ioe) {
131      // If we fail to persist the records with retries then just forget about them.
132      // This is a best effort service.
133      LOG.error("Failed while persisting wal tracker records", ioe);
134      // Increment metrics for failed puts
135      source.incrFailedPuts(queue.size());
136    }
137  }
138
139  private synchronized Queue<WALEventTrackerPayload> getWALEventTrackerList() {
140    Queue<WALEventTrackerPayload> retQueue = new ArrayDeque<>();
141    Iterator<WALEventTrackerPayload> iterator = queue.iterator();
142    while (iterator.hasNext()) {
143      retQueue.add(iterator.next());
144    }
145    queue.clear();
146    return retQueue;
147  }
148}