001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.CompletionException; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentLinkedQueue; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.regex.Matcher; 034import java.util.regex.Pattern; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * {@link BufferedMutator} implementation based on {@link AsyncBufferedMutator}. 044 */ 045@InterfaceAudience.Private 046class BufferedMutatorOverAsyncBufferedMutator implements BufferedMutator { 047 048 private static final Logger LOG = 049 LoggerFactory.getLogger(BufferedMutatorOverAsyncBufferedMutator.class); 050 051 private final AsyncBufferedMutator mutator; 052 053 private final ExceptionListener listener; 054 055 private final Set<CompletableFuture<Void>> futures = ConcurrentHashMap.newKeySet(); 056 057 private final AtomicLong bufferedSize = new AtomicLong(0); 058 059 private final ConcurrentLinkedQueue<Pair<Mutation, Throwable>> errors = 060 new ConcurrentLinkedQueue<>(); 061 062 BufferedMutatorOverAsyncBufferedMutator(AsyncBufferedMutator mutator, 063 ExceptionListener listener) { 064 this.mutator = mutator; 065 this.listener = listener; 066 } 067 068 @Override 069 public TableName getName() { 070 return mutator.getName(); 071 } 072 073 @Override 074 public Configuration getConfiguration() { 075 return mutator.getConfiguration(); 076 } 077 078 @Override 079 public void mutate(Mutation mutation) throws IOException { 080 mutate(Collections.singletonList(mutation)); 081 } 082 083 private static final Pattern ADDR_MSG_MATCHER = Pattern.compile("Call to (\\S+) failed"); 084 085 // not always work, so may return an empty string 086 private String getHostnameAndPort(Throwable error) { 087 Matcher matcher = ADDR_MSG_MATCHER.matcher(error.getMessage()); 088 if (matcher.matches()) { 089 return matcher.group(1); 090 } else { 091 return ""; 092 } 093 } 094 095 private RetriesExhaustedWithDetailsException makeError() { 096 List<Row> rows = new ArrayList<>(); 097 List<Throwable> throwables = new ArrayList<>(); 098 List<String> hostnameAndPorts = new ArrayList<>(); 099 for (;;) { 100 Pair<Mutation, Throwable> pair = errors.poll(); 101 if (pair == null) { 102 break; 103 } 104 rows.add(pair.getFirst()); 105 throwables.add(pair.getSecond()); 106 hostnameAndPorts.add(getHostnameAndPort(pair.getSecond())); 107 } 108 return new RetriesExhaustedWithDetailsException(throwables, rows, hostnameAndPorts); 109 } 110 111 private void internalFlush() throws RetriesExhaustedWithDetailsException { 112 // should get the future array before calling mutator.flush, otherwise we may hit an infinite 113 // wait, since someone may add new future to the map after we calling the flush. 114 CompletableFuture<?>[] toWait = futures.toArray(new CompletableFuture<?>[0]); 115 mutator.flush(); 116 try { 117 CompletableFuture.allOf(toWait).join(); 118 } catch (CompletionException e) { 119 // just ignore, we will record the actual error in the errors field 120 LOG.debug("Flush failed, you should get an exception thrown to your code", e); 121 } 122 if (!errors.isEmpty()) { 123 RetriesExhaustedWithDetailsException error = makeError(); 124 listener.onException(error, this); 125 } 126 } 127 128 @Override 129 public void mutate(List<? extends Mutation> mutations) throws IOException { 130 List<CompletableFuture<Void>> fs = mutator.mutate(mutations); 131 for (int i = 0, n = fs.size(); i < n; i++) { 132 CompletableFuture<Void> toComplete = new CompletableFuture<>(); 133 futures.add(toComplete); 134 Mutation mutation = mutations.get(i); 135 long heapSize = mutation.heapSize(); 136 bufferedSize.addAndGet(heapSize); 137 addListener(fs.get(i), (r, e) -> { 138 bufferedSize.addAndGet(-heapSize); 139 if (e != null) { 140 errors.add(Pair.newPair(mutation, e)); 141 toComplete.completeExceptionally(e); 142 } else { 143 toComplete.complete(r); 144 } 145 // Only remove future after completing, and add the error to errors field before completing, 146 // this is used as a guard in internalFlush method, which is used to make sure that 147 // 1. If the future is already completed and removed from futures, the error should have 148 // already been in errors field, so in internalFlush method, even if we do not wait on the 149 // future, we should still get this error in errors field at the end of the internalFlush 150 // method 151 // 2. If we get this future in the internalFlush method for waiting, then after the future 152 // complete, we should get this error in the errors field at the end of the internalFlush 153 // method 154 futures.remove(toComplete); 155 }); 156 } 157 synchronized (this) { 158 if (bufferedSize.get() > mutator.getWriteBufferSize() * 2) { 159 // We have too many mutations which are not completed yet, let's call a flush to release the 160 // memory to prevent OOM 161 // We use buffer size * 2 is because that, the async buffered mutator will flush 162 // automatically when the write buffer size limit is reached, so usually we do not need to 163 // call flush explicitly if the buffered size is only a little larger than the buffer size 164 // limit. But if the buffered size is too large(2 times of the buffer size), we still need 165 // to block here to prevent OOM. 166 internalFlush(); 167 } else if (!errors.isEmpty()) { 168 RetriesExhaustedWithDetailsException error = makeError(); 169 listener.onException(error, this); 170 } 171 } 172 } 173 174 @Override 175 public synchronized void close() throws IOException { 176 internalFlush(); 177 mutator.close(); 178 } 179 180 @Override 181 public synchronized void flush() throws IOException { 182 internalFlush(); 183 } 184 185 @Override 186 public long getWriteBufferSize() { 187 return mutator.getWriteBufferSize(); 188 } 189 190 @Override 191 public void setRpcTimeout(int timeout) { 192 // no effect 193 } 194 195 @Override 196 public void setOperationTimeout(int timeout) { 197 // no effect 198 } 199 200 @Override 201 public Map<String, byte[]> getRequestAttributes() { 202 return mutator.getRequestAttributes(); 203 } 204}