001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.Map; 022import java.util.Set; 023import java.util.TreeMap; 024import java.util.TreeSet; 025 026import org.apache.hadoop.hbase.util.Bytes; 027import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; 028import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; 029import org.apache.yetus.audience.InterfaceAudience; 030 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Class which accumulates edits and separates them into a buffer per region while simultaneously 036 * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then 037 * pull region-specific buffers from this class. 038 */ 039@InterfaceAudience.Private 040public class EntryBuffers { 041 private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class); 042 043 PipelineController controller; 044 045 Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 046 047 /* 048 * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick 049 * up bytes from a region if we're already writing data for that region in a different IO thread. 050 */ 051 Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR); 052 053 long totalBuffered = 0; 054 long maxHeapUsage; 055 boolean splitWriterCreationBounded; 056 057 public EntryBuffers(PipelineController controller, long maxHeapUsage) { 058 this(controller, maxHeapUsage, false); 059 } 060 061 public EntryBuffers(PipelineController controller, long maxHeapUsage, 062 boolean splitWriterCreationBounded) { 063 this.controller = controller; 064 this.maxHeapUsage = maxHeapUsage; 065 this.splitWriterCreationBounded = splitWriterCreationBounded; 066 } 067 068 /** 069 * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has 070 * crossed the specified threshold. 071 */ 072 public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException { 073 WALKey key = entry.getKey(); 074 RegionEntryBuffer buffer; 075 long incrHeap; 076 synchronized (this) { 077 buffer = buffers.get(key.getEncodedRegionName()); 078 if (buffer == null) { 079 buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName()); 080 buffers.put(key.getEncodedRegionName(), buffer); 081 } 082 incrHeap = buffer.appendEntry(entry); 083 } 084 085 // If we crossed the chunk threshold, wait for more space to be available 086 synchronized (controller.dataAvailable) { 087 totalBuffered += incrHeap; 088 while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { 089 LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered); 090 controller.dataAvailable.wait(2000); 091 } 092 controller.dataAvailable.notifyAll(); 093 } 094 controller.checkForErrors(); 095 } 096 097 /** 098 * @return RegionEntryBuffer a buffer of edits to be written. 099 */ 100 synchronized RegionEntryBuffer getChunkToWrite() { 101 // The core part of limiting opening writers is it doesn't return chunk only if the 102 // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each 103 // region during splitting. It will flush all the logs in the buffer after splitting 104 // through a threadpool, which means the number of writers it created is under control. 105 if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) { 106 return null; 107 } 108 long biggestSize = 0; 109 byte[] biggestBufferKey = null; 110 111 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) { 112 long size = entry.getValue().heapSize(); 113 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) { 114 biggestSize = size; 115 biggestBufferKey = entry.getKey(); 116 } 117 } 118 if (biggestBufferKey == null) { 119 return null; 120 } 121 122 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); 123 currentlyWriting.add(biggestBufferKey); 124 return buffer; 125 } 126 127 void doneWriting(RegionEntryBuffer buffer) { 128 synchronized (this) { 129 boolean removed = currentlyWriting.remove(buffer.encodedRegionName); 130 assert removed; 131 } 132 long size = buffer.heapSize(); 133 134 synchronized (controller.dataAvailable) { 135 totalBuffered -= size; 136 // We may unblock writers 137 controller.dataAvailable.notifyAll(); 138 } 139 } 140 141 synchronized boolean isRegionCurrentlyWriting(byte[] region) { 142 return currentlyWriting.contains(region); 143 } 144 145 public void waitUntilDrained() { 146 synchronized (controller.dataAvailable) { 147 while (totalBuffered > 0) { 148 try { 149 controller.dataAvailable.wait(2000); 150 } catch (InterruptedException e) { 151 LOG.warn("Got interrupted while waiting for EntryBuffers is drained"); 152 Thread.interrupted(); 153 break; 154 } 155 } 156 } 157 } 158}