001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.TreeMap;
026import java.util.TreeSet;
027
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.io.HeapSize;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.hadoop.hbase.util.ClassSize;
032import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Class which accumulates edits and separates them into a buffer per region while simultaneously
039 * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then
040 * pull region-specific buffers from this class.
041 */
042@InterfaceAudience.Private
043public class EntryBuffers {
044  private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class);
045
046  private final PipelineController controller;
047
048  final Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
049
050  /*
051   * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick
052   * up bytes from a region if we're already writing data for that region in a different IO thread.
053   */
054  private final Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
055
056  protected long totalBuffered = 0;
057  protected final long maxHeapUsage;
058
059  public EntryBuffers(PipelineController controller, long maxHeapUsage) {
060    this.controller = controller;
061    this.maxHeapUsage = maxHeapUsage;
062  }
063
064  /**
065   * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has
066   * crossed the specified threshold.
067   */
068  public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException {
069    WALKey key = entry.getKey();
070    RegionEntryBuffer buffer;
071    long incrHeap;
072    synchronized (this) {
073      buffer = buffers.get(key.getEncodedRegionName());
074      if (buffer == null) {
075        buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
076        buffers.put(key.getEncodedRegionName(), buffer);
077      }
078      incrHeap = buffer.appendEntry(entry);
079    }
080
081    // If we crossed the chunk threshold, wait for more space to be available
082    synchronized (controller.dataAvailable) {
083      totalBuffered += incrHeap;
084      while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
085        LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
086        controller.dataAvailable.wait(2000);
087      }
088      controller.dataAvailable.notifyAll();
089    }
090    controller.checkForErrors();
091  }
092
093  /**
094   * @return RegionEntryBuffer a buffer of edits to be written.
095   */
096  synchronized RegionEntryBuffer getChunkToWrite() {
097    long biggestSize = 0;
098    byte[] biggestBufferKey = null;
099
100    for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
101      long size = entry.getValue().heapSize();
102      if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
103        biggestSize = size;
104        biggestBufferKey = entry.getKey();
105      }
106    }
107    if (biggestBufferKey == null) {
108      return null;
109    }
110
111    RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
112    currentlyWriting.add(biggestBufferKey);
113    return buffer;
114  }
115
116  void doneWriting(RegionEntryBuffer buffer) {
117    synchronized (this) {
118      boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
119      assert removed;
120    }
121    long size = buffer.heapSize();
122
123    synchronized (controller.dataAvailable) {
124      totalBuffered -= size;
125      // We may unblock writers
126      controller.dataAvailable.notifyAll();
127    }
128  }
129
130  synchronized boolean isRegionCurrentlyWriting(byte[] region) {
131    return currentlyWriting.contains(region);
132  }
133
134  public void waitUntilDrained() {
135    synchronized (controller.dataAvailable) {
136      while (totalBuffered > 0) {
137        try {
138          controller.dataAvailable.wait(2000);
139        } catch (InterruptedException e) {
140          LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
141          Thread.interrupted();
142          break;
143        }
144      }
145    }
146  }
147
148  /**
149   * A buffer of some number of edits for a given region.
150   * This accumulates edits and also provides a memory optimization in order to
151   * share a single byte array instance for the table and region name.
152   * Also tracks memory usage of the accumulated edits.
153   */
154  public static class RegionEntryBuffer implements HeapSize {
155    private long heapInBuffer = 0;
156    final List<WAL.Entry> entries;
157    final TableName tableName;
158    final byte[] encodedRegionName;
159
160    RegionEntryBuffer(TableName tableName, byte[] region) {
161      this.tableName = tableName;
162      this.encodedRegionName = region;
163      this.entries = new ArrayList<>();
164    }
165
166    long appendEntry(WAL.Entry entry) {
167      internify(entry);
168      entries.add(entry);
169      // TODO linkedlist entry
170      long incrHeap = entry.getEdit().heapSize() +
171          ClassSize.align(2 * ClassSize.REFERENCE); // WALKey pointers
172      heapInBuffer += incrHeap;
173      return incrHeap;
174    }
175
176    private void internify(WAL.Entry entry) {
177      WALKeyImpl k = entry.getKey();
178      k.internTableName(this.tableName);
179      k.internEncodedRegionName(this.encodedRegionName);
180    }
181
182    @Override
183    public long heapSize() {
184      return heapInBuffer;
185    }
186
187    public byte[] getEncodedRegionName() {
188      return encodedRegionName;
189    }
190
191    public TableName getTableName() {
192      return tableName;
193    }
194
195    public List<WAL.Entry> getEntries() {
196      return this.entries;
197    }
198  }
199}