001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.TreeMap; 026import java.util.TreeSet; 027 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.io.HeapSize; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.hadoop.hbase.util.ClassSize; 032import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 038 039/** 040 * Class which accumulates edits and separates them into a buffer per region while simultaneously 041 * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then 042 * pull region-specific buffers from this class. 043 */ 044@InterfaceAudience.Private 045public class EntryBuffers { 046 private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class); 047 048 private final PipelineController controller; 049 050 final Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 051 052 /* 053 * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick 054 * up bytes from a region if we're already writing data for that region in a different IO thread. 055 */ 056 private final Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR); 057 058 protected long totalBuffered = 0; 059 protected final long maxHeapUsage; 060 061 public EntryBuffers(PipelineController controller, long maxHeapUsage) { 062 this.controller = controller; 063 this.maxHeapUsage = maxHeapUsage; 064 } 065 066 /** 067 * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has 068 * crossed the specified threshold. 069 */ 070 public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException { 071 WALKey key = entry.getKey(); 072 RegionEntryBuffer buffer; 073 long incrHeap; 074 synchronized (this) { 075 buffer = buffers.get(key.getEncodedRegionName()); 076 if (buffer == null) { 077 buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName()); 078 buffers.put(key.getEncodedRegionName(), buffer); 079 } 080 incrHeap = buffer.appendEntry(entry); 081 } 082 083 // If we crossed the chunk threshold, wait for more space to be available 084 synchronized (controller.dataAvailable) { 085 totalBuffered += incrHeap; 086 while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { 087 LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered); 088 controller.dataAvailable.wait(2000); 089 } 090 controller.dataAvailable.notifyAll(); 091 } 092 controller.checkForErrors(); 093 } 094 095 /** 096 * @return RegionEntryBuffer a buffer of edits to be written. 097 */ 098 synchronized RegionEntryBuffer getChunkToWrite() { 099 long biggestSize = 0; 100 byte[] biggestBufferKey = null; 101 102 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) { 103 long size = entry.getValue().heapSize(); 104 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) { 105 biggestSize = size; 106 biggestBufferKey = entry.getKey(); 107 } 108 } 109 if (biggestBufferKey == null) { 110 return null; 111 } 112 113 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); 114 currentlyWriting.add(biggestBufferKey); 115 return buffer; 116 } 117 118 void doneWriting(RegionEntryBuffer buffer) { 119 synchronized (this) { 120 boolean removed = currentlyWriting.remove(buffer.encodedRegionName); 121 assert removed; 122 } 123 long size = buffer.heapSize(); 124 125 synchronized (controller.dataAvailable) { 126 totalBuffered -= size; 127 // We may unblock writers 128 controller.dataAvailable.notifyAll(); 129 } 130 } 131 132 @VisibleForTesting 133 synchronized boolean isRegionCurrentlyWriting(byte[] region) { 134 return currentlyWriting.contains(region); 135 } 136 137 public void waitUntilDrained() { 138 synchronized (controller.dataAvailable) { 139 while (totalBuffered > 0) { 140 try { 141 controller.dataAvailable.wait(2000); 142 } catch (InterruptedException e) { 143 LOG.warn("Got interrupted while waiting for EntryBuffers is drained"); 144 Thread.interrupted(); 145 break; 146 } 147 } 148 } 149 } 150 151 /** 152 * A buffer of some number of edits for a given region. 153 * This accumulates edits and also provides a memory optimization in order to 154 * share a single byte array instance for the table and region name. 155 * Also tracks memory usage of the accumulated edits. 156 */ 157 public static class RegionEntryBuffer implements HeapSize { 158 private long heapInBuffer = 0; 159 final List<WAL.Entry> entries; 160 final TableName tableName; 161 final byte[] encodedRegionName; 162 163 RegionEntryBuffer(TableName tableName, byte[] region) { 164 this.tableName = tableName; 165 this.encodedRegionName = region; 166 this.entries = new ArrayList<>(); 167 } 168 169 long appendEntry(WAL.Entry entry) { 170 internify(entry); 171 entries.add(entry); 172 // TODO linkedlist entry 173 long incrHeap = entry.getEdit().heapSize() + 174 ClassSize.align(2 * ClassSize.REFERENCE); // WALKey pointers 175 heapInBuffer += incrHeap; 176 return incrHeap; 177 } 178 179 private void internify(WAL.Entry entry) { 180 WALKeyImpl k = entry.getKey(); 181 k.internTableName(this.tableName); 182 k.internEncodedRegionName(this.encodedRegionName); 183 } 184 185 @Override 186 public long heapSize() { 187 return heapInBuffer; 188 } 189 190 public byte[] getEncodedRegionName() { 191 return encodedRegionName; 192 } 193 194 public TableName getTableName() { 195 return tableName; 196 } 197 198 public List<WAL.Entry> getEntries() { 199 return this.entries; 200 } 201 } 202}