001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.util.Map;
022import java.util.Set;
023import java.util.TreeMap;
024import java.util.TreeSet;
025
026import org.apache.hadoop.hbase.util.Bytes;
027import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
028import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
029import org.apache.yetus.audience.InterfaceAudience;
030
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Class which accumulates edits and separates them into a buffer per region while simultaneously
036 * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then
037 * pull region-specific buffers from this class.
038 */
039@InterfaceAudience.Private
040public class EntryBuffers {
041  private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class);
042
043  PipelineController controller;
044
045  Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
046
047  /*
048   * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick
049   * up bytes from a region if we're already writing data for that region in a different IO thread.
050   */
051  Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
052
053  long totalBuffered = 0;
054  long maxHeapUsage;
055  boolean splitWriterCreationBounded;
056
057  public EntryBuffers(PipelineController controller, long maxHeapUsage) {
058    this(controller, maxHeapUsage, false);
059  }
060
061  public EntryBuffers(PipelineController controller, long maxHeapUsage,
062      boolean splitWriterCreationBounded) {
063    this.controller = controller;
064    this.maxHeapUsage = maxHeapUsage;
065    this.splitWriterCreationBounded = splitWriterCreationBounded;
066  }
067
068  /**
069   * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has
070   * crossed the specified threshold.
071   */
072  public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException {
073    WALKey key = entry.getKey();
074    RegionEntryBuffer buffer;
075    long incrHeap;
076    synchronized (this) {
077      buffer = buffers.get(key.getEncodedRegionName());
078      if (buffer == null) {
079        buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
080        buffers.put(key.getEncodedRegionName(), buffer);
081      }
082      incrHeap = buffer.appendEntry(entry);
083    }
084
085    // If we crossed the chunk threshold, wait for more space to be available
086    synchronized (controller.dataAvailable) {
087      totalBuffered += incrHeap;
088      while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
089        LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
090        controller.dataAvailable.wait(2000);
091      }
092      controller.dataAvailable.notifyAll();
093    }
094    controller.checkForErrors();
095  }
096
097  /**
098   * @return RegionEntryBuffer a buffer of edits to be written.
099   */
100  synchronized RegionEntryBuffer getChunkToWrite() {
101    // The core part of limiting opening writers is it doesn't return chunk only if the
102    // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
103    // region during splitting. It will flush all the logs in the buffer after splitting
104    // through a threadpool, which means the number of writers it created is under control.
105    if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
106      return null;
107    }
108    long biggestSize = 0;
109    byte[] biggestBufferKey = null;
110
111    for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
112      long size = entry.getValue().heapSize();
113      if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
114        biggestSize = size;
115        biggestBufferKey = entry.getKey();
116      }
117    }
118    if (biggestBufferKey == null) {
119      return null;
120    }
121
122    RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
123    currentlyWriting.add(biggestBufferKey);
124    return buffer;
125  }
126
127  void doneWriting(RegionEntryBuffer buffer) {
128    synchronized (this) {
129      boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
130      assert removed;
131    }
132    long size = buffer.heapSize();
133
134    synchronized (controller.dataAvailable) {
135      totalBuffered -= size;
136      // We may unblock writers
137      controller.dataAvailable.notifyAll();
138    }
139  }
140
141  synchronized boolean isRegionCurrentlyWriting(byte[] region) {
142    return currentlyWriting.contains(region);
143  }
144
145  public void waitUntilDrained() {
146    synchronized (controller.dataAvailable) {
147      while (totalBuffered > 0) {
148        try {
149          controller.dataAvailable.wait(2000);
150        } catch (InterruptedException e) {
151          LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
152          Thread.interrupted();
153          break;
154        }
155      }
156    }
157  }
158}