001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.TreeMap; 026import java.util.TreeSet; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.io.HeapSize; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.hbase.util.ClassSize; 031import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * Class which accumulates edits and separates them into a buffer per region while simultaneously 038 * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then 039 * pull region-specific buffers from this class. 040 */ 041@InterfaceAudience.Private 042public class EntryBuffers { 043 private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class); 044 045 private final PipelineController controller; 046 047 final Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 048 049 /* 050 * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick 051 * up bytes from a region if we're already writing data for that region in a different IO thread. 052 */ 053 private final Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR); 054 055 protected long totalBuffered = 0; 056 protected final long maxHeapUsage; 057 058 public EntryBuffers(PipelineController controller, long maxHeapUsage) { 059 this.controller = controller; 060 this.maxHeapUsage = maxHeapUsage; 061 } 062 063 /** 064 * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has 065 * crossed the specified threshold. 066 */ 067 public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException { 068 WALKey key = entry.getKey(); 069 RegionEntryBuffer buffer; 070 long incrHeap; 071 synchronized (this) { 072 buffer = buffers.get(key.getEncodedRegionName()); 073 if (buffer == null) { 074 buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName()); 075 buffers.put(key.getEncodedRegionName(), buffer); 076 } 077 incrHeap = buffer.appendEntry(entry); 078 } 079 080 // If we crossed the chunk threshold, wait for more space to be available 081 synchronized (controller.dataAvailable) { 082 totalBuffered += incrHeap; 083 while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { 084 LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered); 085 controller.dataAvailable.wait(2000); 086 } 087 controller.dataAvailable.notifyAll(); 088 } 089 controller.checkForErrors(); 090 } 091 092 /** Returns RegionEntryBuffer a buffer of edits to be written. */ 093 synchronized RegionEntryBuffer getChunkToWrite() { 094 long biggestSize = 0; 095 byte[] biggestBufferKey = null; 096 097 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) { 098 long size = entry.getValue().heapSize(); 099 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) { 100 biggestSize = size; 101 biggestBufferKey = entry.getKey(); 102 } 103 } 104 if (biggestBufferKey == null) { 105 return null; 106 } 107 108 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); 109 currentlyWriting.add(biggestBufferKey); 110 return buffer; 111 } 112 113 void doneWriting(RegionEntryBuffer buffer) { 114 synchronized (this) { 115 boolean removed = currentlyWriting.remove(buffer.encodedRegionName); 116 assert removed; 117 } 118 long size = buffer.heapSize(); 119 120 synchronized (controller.dataAvailable) { 121 totalBuffered -= size; 122 // We may unblock writers 123 controller.dataAvailable.notifyAll(); 124 } 125 } 126 127 synchronized boolean isRegionCurrentlyWriting(byte[] region) { 128 return currentlyWriting.contains(region); 129 } 130 131 public void waitUntilDrained() { 132 synchronized (controller.dataAvailable) { 133 while (totalBuffered > 0) { 134 try { 135 controller.dataAvailable.wait(2000); 136 } catch (InterruptedException e) { 137 LOG.warn("Got interrupted while waiting for EntryBuffers is drained"); 138 Thread.interrupted(); 139 break; 140 } 141 } 142 } 143 } 144 145 /** 146 * A buffer of some number of edits for a given region. This accumulates edits and also provides a 147 * memory optimization in order to share a single byte array instance for the table and region 148 * name. Also tracks memory usage of the accumulated edits. 149 */ 150 public static class RegionEntryBuffer implements HeapSize { 151 private long heapInBuffer = 0; 152 final List<WAL.Entry> entries; 153 final TableName tableName; 154 final byte[] encodedRegionName; 155 156 RegionEntryBuffer(TableName tableName, byte[] region) { 157 this.tableName = tableName; 158 this.encodedRegionName = region; 159 this.entries = new ArrayList<>(); 160 } 161 162 long appendEntry(WAL.Entry entry) { 163 internify(entry); 164 entries.add(entry); 165 // TODO linkedlist entry 166 // entry size plus WALKey pointers 167 long incrHeap = entry.getEdit().heapSize() + ClassSize.align(2 * ClassSize.REFERENCE); 168 heapInBuffer += incrHeap; 169 return incrHeap; 170 } 171 172 private void internify(WAL.Entry entry) { 173 WALKeyImpl k = entry.getKey(); 174 k.internTableName(this.tableName); 175 k.internEncodedRegionName(this.encodedRegionName); 176 } 177 178 @Override 179 public long heapSize() { 180 return heapInBuffer; 181 } 182 183 public byte[] getEncodedRegionName() { 184 return encodedRegionName; 185 } 186 187 public TableName getTableName() { 188 return tableName; 189 } 190 191 public List<WAL.Entry> getEntries() { 192 return this.entries; 193 } 194 } 195}