001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.TreeMap;
026import java.util.TreeSet;
027
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.io.HeapSize;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.hadoop.hbase.util.ClassSize;
032import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
038
039/**
040 * Class which accumulates edits and separates them into a buffer per region while simultaneously
041 * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then
042 * pull region-specific buffers from this class.
043 */
044@InterfaceAudience.Private
045class EntryBuffers {
046  private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class);
047
048  private final PipelineController controller;
049
050  final Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
051
052  /*
053   * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick
054   * up bytes from a region if we're already writing data for that region in a different IO thread.
055   */
056  private final Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
057
058  protected long totalBuffered = 0;
059  protected final long maxHeapUsage;
060
061  public EntryBuffers(PipelineController controller, long maxHeapUsage) {
062    this.controller = controller;
063    this.maxHeapUsage = maxHeapUsage;
064  }
065
066  /**
067   * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has
068   * crossed the specified threshold.
069   */
070  void appendEntry(WAL.Entry entry) throws InterruptedException, IOException {
071    WALKey key = entry.getKey();
072    RegionEntryBuffer buffer;
073    long incrHeap;
074    synchronized (this) {
075      buffer = buffers.get(key.getEncodedRegionName());
076      if (buffer == null) {
077        buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
078        buffers.put(key.getEncodedRegionName(), buffer);
079      }
080      incrHeap = buffer.appendEntry(entry);
081    }
082
083    // If we crossed the chunk threshold, wait for more space to be available
084    synchronized (controller.dataAvailable) {
085      totalBuffered += incrHeap;
086      while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
087        LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
088        controller.dataAvailable.wait(2000);
089      }
090      controller.dataAvailable.notifyAll();
091    }
092    controller.checkForErrors();
093  }
094
095  /**
096   * @return RegionEntryBuffer a buffer of edits to be written.
097   */
098  synchronized RegionEntryBuffer getChunkToWrite() {
099    long biggestSize = 0;
100    byte[] biggestBufferKey = null;
101
102    for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
103      long size = entry.getValue().heapSize();
104      if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
105        biggestSize = size;
106        biggestBufferKey = entry.getKey();
107      }
108    }
109    if (biggestBufferKey == null) {
110      return null;
111    }
112
113    RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
114    currentlyWriting.add(biggestBufferKey);
115    return buffer;
116  }
117
118  void doneWriting(RegionEntryBuffer buffer) {
119    synchronized (this) {
120      boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
121      assert removed;
122    }
123    long size = buffer.heapSize();
124
125    synchronized (controller.dataAvailable) {
126      totalBuffered -= size;
127      // We may unblock writers
128      controller.dataAvailable.notifyAll();
129    }
130  }
131
132  @VisibleForTesting
133  synchronized boolean isRegionCurrentlyWriting(byte[] region) {
134    return currentlyWriting.contains(region);
135  }
136
137  /**
138   * A buffer of some number of edits for a given region.
139   * This accumulates edits and also provides a memory optimization in order to
140   * share a single byte array instance for the table and region name.
141   * Also tracks memory usage of the accumulated edits.
142   */
143  static class RegionEntryBuffer implements HeapSize {
144    private long heapInBuffer = 0;
145    final List<WAL.Entry> entryBuffer;
146    final TableName tableName;
147    final byte[] encodedRegionName;
148
149    RegionEntryBuffer(TableName tableName, byte[] region) {
150      this.tableName = tableName;
151      this.encodedRegionName = region;
152      this.entryBuffer = new ArrayList<>();
153    }
154
155    long appendEntry(WAL.Entry entry) {
156      internify(entry);
157      entryBuffer.add(entry);
158      // TODO linkedlist entry
159      long incrHeap = entry.getEdit().heapSize() +
160          ClassSize.align(2 * ClassSize.REFERENCE); // WALKey pointers
161      heapInBuffer += incrHeap;
162      return incrHeap;
163    }
164
165    private void internify(WAL.Entry entry) {
166      WALKeyImpl k = entry.getKey();
167      k.internTableName(this.tableName);
168      k.internEncodedRegionName(this.encodedRegionName);
169    }
170
171    @Override
172    public long heapSize() {
173      return heapInBuffer;
174    }
175
176    public byte[] getEncodedRegionName() {
177      return encodedRegionName;
178    }
179
180    public TableName getTableName() {
181      return tableName;
182    }
183  }
184}