001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.TreeMap;
026import java.util.TreeSet;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.io.HeapSize;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.util.ClassSize;
031import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * Class which accumulates edits and separates them into a buffer per region while simultaneously
038 * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then
039 * pull region-specific buffers from this class.
040 */
041@InterfaceAudience.Private
042public class EntryBuffers {
043  private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class);
044
045  private final PipelineController controller;
046
047  final Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
048
049  /*
050   * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick
051   * up bytes from a region if we're already writing data for that region in a different IO thread.
052   */
053  private final Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
054
055  protected long totalBuffered = 0;
056  protected final long maxHeapUsage;
057
058  public EntryBuffers(PipelineController controller, long maxHeapUsage) {
059    this.controller = controller;
060    this.maxHeapUsage = maxHeapUsage;
061  }
062
063  /**
064   * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has
065   * crossed the specified threshold.
066   */
067  public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException {
068    WALKey key = entry.getKey();
069    RegionEntryBuffer buffer;
070    long incrHeap;
071    synchronized (this) {
072      buffer = buffers.get(key.getEncodedRegionName());
073      if (buffer == null) {
074        buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
075        buffers.put(key.getEncodedRegionName(), buffer);
076      }
077      incrHeap = buffer.appendEntry(entry);
078    }
079
080    // If we crossed the chunk threshold, wait for more space to be available
081    synchronized (controller.dataAvailable) {
082      totalBuffered += incrHeap;
083      while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
084        LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
085        controller.dataAvailable.wait(2000);
086      }
087      controller.dataAvailable.notifyAll();
088    }
089    controller.checkForErrors();
090  }
091
092  /** Returns RegionEntryBuffer a buffer of edits to be written. */
093  synchronized RegionEntryBuffer getChunkToWrite() {
094    long biggestSize = 0;
095    byte[] biggestBufferKey = null;
096
097    for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
098      long size = entry.getValue().heapSize();
099      if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
100        biggestSize = size;
101        biggestBufferKey = entry.getKey();
102      }
103    }
104    if (biggestBufferKey == null) {
105      return null;
106    }
107
108    RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
109    currentlyWriting.add(biggestBufferKey);
110    return buffer;
111  }
112
113  void doneWriting(RegionEntryBuffer buffer) {
114    synchronized (this) {
115      boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
116      assert removed;
117    }
118    long size = buffer.heapSize();
119
120    synchronized (controller.dataAvailable) {
121      totalBuffered -= size;
122      // We may unblock writers
123      controller.dataAvailable.notifyAll();
124    }
125  }
126
127  synchronized boolean isRegionCurrentlyWriting(byte[] region) {
128    return currentlyWriting.contains(region);
129  }
130
131  public void waitUntilDrained() {
132    synchronized (controller.dataAvailable) {
133      while (totalBuffered > 0) {
134        try {
135          controller.dataAvailable.wait(2000);
136        } catch (InterruptedException e) {
137          LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
138          Thread.interrupted();
139          break;
140        }
141      }
142    }
143  }
144
145  /**
146   * A buffer of some number of edits for a given region. This accumulates edits and also provides a
147   * memory optimization in order to share a single byte array instance for the table and region
148   * name. Also tracks memory usage of the accumulated edits.
149   */
150  public static class RegionEntryBuffer implements HeapSize {
151    private long heapInBuffer = 0;
152    final List<WAL.Entry> entries;
153    final TableName tableName;
154    final byte[] encodedRegionName;
155
156    RegionEntryBuffer(TableName tableName, byte[] region) {
157      this.tableName = tableName;
158      this.encodedRegionName = region;
159      this.entries = new ArrayList<>();
160    }
161
162    long appendEntry(WAL.Entry entry) {
163      internify(entry);
164      entries.add(entry);
165      // TODO linkedlist entry
166      // entry size plus WALKey pointers
167      long incrHeap = entry.getEdit().heapSize() + ClassSize.align(2 * ClassSize.REFERENCE);
168      heapInBuffer += incrHeap;
169      return incrHeap;
170    }
171
172    private void internify(WAL.Entry entry) {
173      WALKeyImpl k = entry.getKey();
174      k.internTableName(this.tableName);
175      k.internEncodedRegionName(this.encodedRegionName);
176    }
177
178    @Override
179    public long heapSize() {
180      return heapInBuffer;
181    }
182
183    public byte[] getEncodedRegionName() {
184      return encodedRegionName;
185    }
186
187    public TableName getTableName() {
188      return tableName;
189    }
190
191    public List<WAL.Entry> getEntries() {
192      return this.entries;
193    }
194  }
195}