001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.TreeMap; 026import java.util.TreeSet; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.io.HeapSize; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.hbase.util.ClassSize; 031import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * Class which accumulates edits and separates them into a buffer per region while simultaneously 038 * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then 039 * pull region-specific buffers from this class. 040 */ 041@InterfaceAudience.Private 042class EntryBuffers { 043 private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class); 044 045 private final PipelineController controller; 046 047 final Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 048 049 /* 050 * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick 051 * up bytes from a region if we're already writing data for that region in a different IO thread. 052 */ 053 private final Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR); 054 055 protected long totalBuffered = 0; 056 protected final long maxHeapUsage; 057 058 public EntryBuffers(PipelineController controller, long maxHeapUsage) { 059 this.controller = controller; 060 this.maxHeapUsage = maxHeapUsage; 061 } 062 063 /** 064 * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has 065 * crossed the specified threshold. 066 */ 067 void appendEntry(WAL.Entry entry) throws InterruptedException, IOException { 068 WALKey key = entry.getKey(); 069 RegionEntryBuffer buffer; 070 long incrHeap; 071 synchronized (this) { 072 buffer = buffers.get(key.getEncodedRegionName()); 073 if (buffer == null) { 074 buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName()); 075 buffers.put(key.getEncodedRegionName(), buffer); 076 } 077 incrHeap = buffer.appendEntry(entry); 078 } 079 080 // If we crossed the chunk threshold, wait for more space to be available 081 synchronized (controller.dataAvailable) { 082 totalBuffered += incrHeap; 083 while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { 084 LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered); 085 controller.dataAvailable.wait(2000); 086 } 087 controller.dataAvailable.notifyAll(); 088 } 089 controller.checkForErrors(); 090 } 091 092 /** 093 * @return RegionEntryBuffer a buffer of edits to be written. 094 */ 095 synchronized RegionEntryBuffer getChunkToWrite() { 096 long biggestSize = 0; 097 byte[] biggestBufferKey = null; 098 099 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) { 100 long size = entry.getValue().heapSize(); 101 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) { 102 biggestSize = size; 103 biggestBufferKey = entry.getKey(); 104 } 105 } 106 if (biggestBufferKey == null) { 107 return null; 108 } 109 110 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); 111 currentlyWriting.add(biggestBufferKey); 112 return buffer; 113 } 114 115 void doneWriting(RegionEntryBuffer buffer) { 116 synchronized (this) { 117 boolean removed = currentlyWriting.remove(buffer.encodedRegionName); 118 assert removed; 119 } 120 long size = buffer.heapSize(); 121 122 synchronized (controller.dataAvailable) { 123 totalBuffered -= size; 124 // We may unblock writers 125 controller.dataAvailable.notifyAll(); 126 } 127 } 128 129 synchronized boolean isRegionCurrentlyWriting(byte[] region) { 130 return currentlyWriting.contains(region); 131 } 132 133 /** 134 * A buffer of some number of edits for a given region. This accumulates edits and also provides a 135 * memory optimization in order to share a single byte array instance for the table and region 136 * name. Also tracks memory usage of the accumulated edits. 137 */ 138 static class RegionEntryBuffer implements HeapSize { 139 private long heapInBuffer = 0; 140 final List<WAL.Entry> entryBuffer; 141 final TableName tableName; 142 final byte[] encodedRegionName; 143 144 RegionEntryBuffer(TableName tableName, byte[] region) { 145 this.tableName = tableName; 146 this.encodedRegionName = region; 147 this.entryBuffer = new ArrayList<>(); 148 } 149 150 long appendEntry(WAL.Entry entry) { 151 internify(entry); 152 entryBuffer.add(entry); 153 // TODO linkedlist entry 154 // entry size plus WALKey pointers 155 long incrHeap = entry.getEdit().heapSize() + ClassSize.align(2 * ClassSize.REFERENCE); 156 heapInBuffer += incrHeap; 157 return incrHeap; 158 } 159 160 private void internify(WAL.Entry entry) { 161 WALKeyImpl k = entry.getKey(); 162 k.internTableName(this.tableName); 163 k.internEncodedRegionName(this.encodedRegionName); 164 } 165 166 @Override 167 public long heapSize() { 168 return heapInBuffer; 169 } 170 171 public byte[] getEncodedRegionName() { 172 return encodedRegionName; 173 } 174 175 public TableName getTableName() { 176 return tableName; 177 } 178 } 179}