001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.TreeMap; 026import java.util.TreeSet; 027 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.io.HeapSize; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.hadoop.hbase.util.ClassSize; 032import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Class which accumulates edits and separates them into a buffer per region while simultaneously 039 * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then 040 * pull region-specific buffers from this class. 041 */ 042@InterfaceAudience.Private 043public class EntryBuffers { 044 private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class); 045 046 private final PipelineController controller; 047 048 final Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 049 050 /* 051 * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick 052 * up bytes from a region if we're already writing data for that region in a different IO thread. 053 */ 054 private final Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR); 055 056 protected long totalBuffered = 0; 057 protected final long maxHeapUsage; 058 059 public EntryBuffers(PipelineController controller, long maxHeapUsage) { 060 this.controller = controller; 061 this.maxHeapUsage = maxHeapUsage; 062 } 063 064 /** 065 * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has 066 * crossed the specified threshold. 067 */ 068 public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException { 069 WALKey key = entry.getKey(); 070 RegionEntryBuffer buffer; 071 long incrHeap; 072 synchronized (this) { 073 buffer = buffers.get(key.getEncodedRegionName()); 074 if (buffer == null) { 075 buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName()); 076 buffers.put(key.getEncodedRegionName(), buffer); 077 } 078 incrHeap = buffer.appendEntry(entry); 079 } 080 081 // If we crossed the chunk threshold, wait for more space to be available 082 synchronized (controller.dataAvailable) { 083 totalBuffered += incrHeap; 084 while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { 085 LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered); 086 controller.dataAvailable.wait(2000); 087 } 088 controller.dataAvailable.notifyAll(); 089 } 090 controller.checkForErrors(); 091 } 092 093 /** 094 * @return RegionEntryBuffer a buffer of edits to be written. 095 */ 096 synchronized RegionEntryBuffer getChunkToWrite() { 097 long biggestSize = 0; 098 byte[] biggestBufferKey = null; 099 100 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) { 101 long size = entry.getValue().heapSize(); 102 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) { 103 biggestSize = size; 104 biggestBufferKey = entry.getKey(); 105 } 106 } 107 if (biggestBufferKey == null) { 108 return null; 109 } 110 111 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); 112 currentlyWriting.add(biggestBufferKey); 113 return buffer; 114 } 115 116 void doneWriting(RegionEntryBuffer buffer) { 117 synchronized (this) { 118 boolean removed = currentlyWriting.remove(buffer.encodedRegionName); 119 assert removed; 120 } 121 long size = buffer.heapSize(); 122 123 synchronized (controller.dataAvailable) { 124 totalBuffered -= size; 125 // We may unblock writers 126 controller.dataAvailable.notifyAll(); 127 } 128 } 129 130 synchronized boolean isRegionCurrentlyWriting(byte[] region) { 131 return currentlyWriting.contains(region); 132 } 133 134 public void waitUntilDrained() { 135 synchronized (controller.dataAvailable) { 136 while (totalBuffered > 0) { 137 try { 138 controller.dataAvailable.wait(2000); 139 } catch (InterruptedException e) { 140 LOG.warn("Got interrupted while waiting for EntryBuffers is drained"); 141 Thread.interrupted(); 142 break; 143 } 144 } 145 } 146 } 147 148 /** 149 * A buffer of some number of edits for a given region. 150 * This accumulates edits and also provides a memory optimization in order to 151 * share a single byte array instance for the table and region name. 152 * Also tracks memory usage of the accumulated edits. 153 */ 154 public static class RegionEntryBuffer implements HeapSize { 155 private long heapInBuffer = 0; 156 final List<WAL.Entry> entries; 157 final TableName tableName; 158 final byte[] encodedRegionName; 159 160 RegionEntryBuffer(TableName tableName, byte[] region) { 161 this.tableName = tableName; 162 this.encodedRegionName = region; 163 this.entries = new ArrayList<>(); 164 } 165 166 long appendEntry(WAL.Entry entry) { 167 internify(entry); 168 entries.add(entry); 169 // TODO linkedlist entry 170 long incrHeap = entry.getEdit().heapSize() + 171 ClassSize.align(2 * ClassSize.REFERENCE); // WALKey pointers 172 heapInBuffer += incrHeap; 173 return incrHeap; 174 } 175 176 private void internify(WAL.Entry entry) { 177 WALKeyImpl k = entry.getKey(); 178 k.internTableName(this.tableName); 179 k.internEncodedRegionName(this.encodedRegionName); 180 } 181 182 @Override 183 public long heapSize() { 184 return heapInBuffer; 185 } 186 187 public byte[] getEncodedRegionName() { 188 return encodedRegionName; 189 } 190 191 public TableName getTableName() { 192 return tableName; 193 } 194 195 public List<WAL.Entry> getEntries() { 196 return this.entries; 197 } 198 } 199}