001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.CompletionException; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentLinkedQueue; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.regex.Matcher; 034import java.util.regex.Pattern; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * {@link BufferedMutator} implementation based on {@link AsyncBufferedMutator}. 044 */ 045@InterfaceAudience.Private 046class BufferedMutatorOverAsyncBufferedMutator implements BufferedMutator { 047 048 private static final Logger LOG = 049 LoggerFactory.getLogger(BufferedMutatorOverAsyncBufferedMutator.class); 050 051 private final AsyncBufferedMutator mutator; 052 053 private final ExceptionListener listener; 054 055 private final Set<CompletableFuture<Void>> futures = ConcurrentHashMap.newKeySet(); 056 057 private final AtomicLong bufferedSize = new AtomicLong(0); 058 059 private final ConcurrentLinkedQueue<Pair<Mutation, Throwable>> errors = 060 new ConcurrentLinkedQueue<>(); 061 062 BufferedMutatorOverAsyncBufferedMutator(AsyncBufferedMutator mutator, 063 ExceptionListener listener) { 064 this.mutator = mutator; 065 this.listener = listener; 066 } 067 068 @Override 069 public TableName getName() { 070 return mutator.getName(); 071 } 072 073 @Override 074 public Configuration getConfiguration() { 075 return mutator.getConfiguration(); 076 } 077 078 @Override 079 public void mutate(Mutation mutation) throws IOException { 080 mutate(Collections.singletonList(mutation)); 081 } 082 083 private static final Pattern ADDR_MSG_MATCHER = Pattern.compile("Call to (\\S+) failed"); 084 085 // not always work, so may return an empty string 086 private String getHostnameAndPort(Throwable error) { 087 Matcher matcher = ADDR_MSG_MATCHER.matcher(error.getMessage()); 088 if (matcher.matches()) { 089 return matcher.group(1); 090 } else { 091 return ""; 092 } 093 } 094 095 private RetriesExhaustedWithDetailsException makeError() { 096 List<Row> rows = new ArrayList<>(); 097 List<Throwable> throwables = new ArrayList<>(); 098 List<String> hostnameAndPorts = new ArrayList<>(); 099 for (;;) { 100 Pair<Mutation, Throwable> pair = errors.poll(); 101 if (pair == null) { 102 break; 103 } 104 rows.add(pair.getFirst()); 105 throwables.add(pair.getSecond()); 106 hostnameAndPorts.add(getHostnameAndPort(pair.getSecond())); 107 } 108 return new RetriesExhaustedWithDetailsException(throwables, rows, hostnameAndPorts); 109 } 110 111 private void internalFlush() throws RetriesExhaustedWithDetailsException { 112 // should get the future array before calling mutator.flush, otherwise we may hit an infinite 113 // wait, since someone may add new future to the map after we calling the flush. 114 CompletableFuture<?>[] toWait = futures.toArray(new CompletableFuture<?>[0]); 115 mutator.flush(); 116 try { 117 CompletableFuture.allOf(toWait).join(); 118 } catch (CompletionException e) { 119 // just ignore, we will record the actual error in the errors field 120 LOG.debug("Flush failed, you should get an exception thrown to your code", e); 121 } 122 if (!errors.isEmpty()) { 123 RetriesExhaustedWithDetailsException error = makeError(); 124 listener.onException(error, this); 125 } 126 } 127 128 @Override 129 public void mutate(List<? extends Mutation> mutations) throws IOException { 130 List<CompletableFuture<Void>> fs = mutator.mutate(mutations); 131 for (int i = 0, n = fs.size(); i < n; i++) { 132 CompletableFuture<Void> toComplete = new CompletableFuture<>(); 133 futures.add(toComplete); 134 Mutation mutation = mutations.get(i); 135 long heapSize = mutation.heapSize(); 136 bufferedSize.addAndGet(heapSize); 137 addListener(fs.get(i), (r, e) -> { 138 futures.remove(toComplete); 139 bufferedSize.addAndGet(-heapSize); 140 if (e != null) { 141 errors.add(Pair.newPair(mutation, e)); 142 toComplete.completeExceptionally(e); 143 } else { 144 toComplete.complete(r); 145 } 146 }); 147 } 148 synchronized (this) { 149 if (bufferedSize.get() > mutator.getWriteBufferSize() * 2) { 150 // We have too many mutations which are not completed yet, let's call a flush to release the 151 // memory to prevent OOM 152 // We use buffer size * 2 is because that, the async buffered mutator will flush 153 // automatically when the write buffer size limit is reached, so usually we do not need to 154 // call flush explicitly if the buffered size is only a little larger than the buffer size 155 // limit. But if the buffered size is too large(2 times of the buffer size), we still need 156 // to block here to prevent OOM. 157 internalFlush(); 158 } else if (!errors.isEmpty()) { 159 RetriesExhaustedWithDetailsException error = makeError(); 160 listener.onException(error, this); 161 } 162 } 163 } 164 165 @Override 166 public synchronized void close() throws IOException { 167 internalFlush(); 168 mutator.close(); 169 } 170 171 @Override 172 public synchronized void flush() throws IOException { 173 internalFlush(); 174 } 175 176 @Override 177 public long getWriteBufferSize() { 178 return mutator.getWriteBufferSize(); 179 } 180 181 @Override 182 public void setRpcTimeout(int timeout) { 183 // no effect 184 } 185 186 @Override 187 public void setOperationTimeout(int timeout) { 188 // no effect 189 } 190 191 @Override 192 public Map<String, byte[]> getRequestAttributes() { 193 return mutator.getRequestAttributes(); 194 } 195}