1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.NoSuchElementException;
25 import java.util.Random;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.HRegionInfo;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.RemoteExceptionHandler;
38 import org.apache.hadoop.hbase.TableNotDisabledException;
39 import org.apache.hadoop.hbase.MetaTableAccessor;
40 import org.apache.hadoop.hbase.client.Admin;
41 import org.apache.hadoop.hbase.client.Delete;
42 import org.apache.hadoop.hbase.client.HBaseAdmin;
43 import org.apache.hadoop.hbase.client.HConnectable;
44 import org.apache.hadoop.hbase.client.HConnection;
45 import org.apache.hadoop.hbase.client.HConnectionManager;
46 import org.apache.hadoop.hbase.client.HTable;
47 import org.apache.hadoop.hbase.client.Result;
48 import org.apache.hadoop.hbase.client.ResultScanner;
49 import org.apache.hadoop.hbase.client.Table;
50 import org.apache.hadoop.hbase.regionserver.HRegion;
51 import org.apache.hadoop.hbase.wal.WALFactory;
52
53
54
55
56
57 @InterfaceAudience.Private
58 class HMerge {
59
60 private static final Log LOG = LogFactory.getLog(HMerge.class);
61 static final Random rand = new Random();
62
63
64
65
66 private HMerge() {
67 super();
68 }
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 public static void merge(Configuration conf, FileSystem fs,
84 final TableName tableName)
85 throws IOException {
86 merge(conf, fs, tableName, true);
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 public static void merge(Configuration conf, FileSystem fs,
105 final TableName tableName, final boolean testMasterRunning)
106 throws IOException {
107 boolean masterIsRunning = false;
108 if (testMasterRunning) {
109 masterIsRunning = HConnectionManager
110 .execute(new HConnectable<Boolean>(conf) {
111 @Override
112 public Boolean connect(HConnection connection) throws IOException {
113 return connection.isMasterRunning();
114 }
115 });
116 }
117 if (tableName.equals(TableName.META_TABLE_NAME)) {
118 if (masterIsRunning) {
119 throw new IllegalStateException(
120 "Can not compact hbase:meta table if instance is on-line");
121 }
122
123 } else {
124 if(!masterIsRunning) {
125 throw new IllegalStateException(
126 "HBase instance must be running to merge a normal table");
127 }
128 Admin admin = new HBaseAdmin(conf);
129 try {
130 if (!admin.isTableDisabled(tableName)) {
131 throw new TableNotDisabledException(tableName);
132 }
133 } finally {
134 admin.close();
135 }
136 new OnlineMerger(conf, fs, tableName).process();
137 }
138 }
139
140 private static abstract class Merger {
141 protected final Configuration conf;
142 protected final FileSystem fs;
143 protected final Path rootDir;
144 protected final HTableDescriptor htd;
145 protected final WALFactory walFactory;
146 private final long maxFilesize;
147
148
149 protected Merger(Configuration conf, FileSystem fs, final TableName tableName)
150 throws IOException {
151 this.conf = conf;
152 this.fs = fs;
153 this.maxFilesize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
154 HConstants.DEFAULT_MAX_FILE_SIZE);
155
156 this.rootDir = FSUtils.getRootDir(conf);
157 Path tabledir = FSUtils.getTableDir(this.rootDir, tableName);
158 this.htd = FSTableDescriptors.getTableDescriptorFromFs(this.fs, tabledir);
159 String logname = "merge_" + System.currentTimeMillis() + HConstants.HREGION_LOGDIR_NAME;
160
161 final Configuration walConf = new Configuration(conf);
162 FSUtils.setRootDir(walConf, tabledir);
163 this.walFactory = new WALFactory(walConf, null, logname);
164 }
165
166 void process() throws IOException {
167 try {
168 for (HRegionInfo[] regionsToMerge = next();
169 regionsToMerge != null;
170 regionsToMerge = next()) {
171 if (!merge(regionsToMerge)) {
172 return;
173 }
174 }
175 } finally {
176 try {
177 walFactory.close();
178 } catch(IOException e) {
179 LOG.error(e);
180 }
181 }
182 }
183
184 protected boolean merge(final HRegionInfo[] info) throws IOException {
185 if (info.length < 2) {
186 LOG.info("only one region - nothing to merge");
187 return false;
188 }
189
190 HRegion currentRegion = null;
191 long currentSize = 0;
192 HRegion nextRegion = null;
193 long nextSize = 0;
194 for (int i = 0; i < info.length - 1; i++) {
195 if (currentRegion == null) {
196 currentRegion = HRegion.openHRegion(conf, fs, this.rootDir, info[i], this.htd,
197 walFactory.getWAL(info[i].getEncodedNameAsBytes()));
198 currentSize = currentRegion.getLargestHStoreSize();
199 }
200 nextRegion = HRegion.openHRegion(conf, fs, this.rootDir, info[i + 1], this.htd,
201 walFactory.getWAL(info[i+1].getEncodedNameAsBytes()));
202 nextSize = nextRegion.getLargestHStoreSize();
203
204 if ((currentSize + nextSize) <= (maxFilesize / 2)) {
205
206
207 LOG.info("Merging regions " + currentRegion.getRegionInfo().getRegionNameAsString() +
208 " and " + nextRegion.getRegionInfo().getRegionNameAsString());
209 HRegion mergedRegion =
210 HRegion.mergeAdjacent(currentRegion, nextRegion);
211 updateMeta(currentRegion.getRegionInfo().getRegionName(),
212 nextRegion.getRegionInfo().getRegionName(), mergedRegion);
213 break;
214 }
215 LOG.info("not merging regions " +
216 Bytes.toStringBinary(currentRegion.getRegionInfo().getRegionName()) +
217 " and " + Bytes.toStringBinary(nextRegion.getRegionInfo().getRegionName()));
218 currentRegion.close();
219 currentRegion = nextRegion;
220 currentSize = nextSize;
221 }
222 if(currentRegion != null) {
223 currentRegion.close();
224 }
225 return true;
226 }
227
228 protected abstract HRegionInfo[] next() throws IOException;
229
230 protected abstract void updateMeta(final byte [] oldRegion1,
231 final byte [] oldRegion2, HRegion newRegion)
232 throws IOException;
233
234 }
235
236
237 private static class OnlineMerger extends Merger {
238 private final TableName tableName;
239 private final Table table;
240 private final ResultScanner metaScanner;
241 private HRegionInfo latestRegion;
242
243 OnlineMerger(Configuration conf, FileSystem fs,
244 final TableName tableName)
245 throws IOException {
246 super(conf, fs, tableName);
247 this.tableName = tableName;
248 this.table = new HTable(conf, TableName.META_TABLE_NAME);
249 this.metaScanner = table.getScanner(HConstants.CATALOG_FAMILY,
250 HConstants.REGIONINFO_QUALIFIER);
251 this.latestRegion = null;
252 }
253
254 private HRegionInfo nextRegion() throws IOException {
255 try {
256 Result results = getMetaRow();
257 if (results == null) {
258 return null;
259 }
260 HRegionInfo region = HRegionInfo.getHRegionInfo(results);
261 if (region == null) {
262 throw new NoSuchElementException("meta region entry missing " +
263 Bytes.toString(HConstants.CATALOG_FAMILY) + ":" +
264 Bytes.toString(HConstants.REGIONINFO_QUALIFIER));
265 }
266 if (!region.getTable().equals(this.tableName)) {
267 return null;
268 }
269 return region;
270 } catch (IOException e) {
271 e = RemoteExceptionHandler.checkIOException(e);
272 LOG.error("meta scanner error", e);
273 metaScanner.close();
274 throw e;
275 }
276 }
277
278
279
280
281
282
283 private Result getMetaRow() throws IOException {
284 Result currentRow = metaScanner.next();
285 boolean foundResult = false;
286 while (currentRow != null) {
287 LOG.info("Row: <" + Bytes.toStringBinary(currentRow.getRow()) + ">");
288 byte[] regionInfoValue = currentRow.getValue(HConstants.CATALOG_FAMILY,
289 HConstants.REGIONINFO_QUALIFIER);
290 if (regionInfoValue == null || regionInfoValue.length == 0) {
291 currentRow = metaScanner.next();
292 continue;
293 }
294 HRegionInfo region = HRegionInfo.getHRegionInfo(currentRow);
295 if (!region.getTable().equals(this.tableName)) {
296 currentRow = metaScanner.next();
297 continue;
298 }
299 foundResult = true;
300 break;
301 }
302 return foundResult ? currentRow : null;
303 }
304
305 @Override
306 protected HRegionInfo[] next() throws IOException {
307 List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
308 if(latestRegion == null) {
309 latestRegion = nextRegion();
310 }
311 if(latestRegion != null) {
312 regions.add(latestRegion);
313 }
314 latestRegion = nextRegion();
315 if(latestRegion != null) {
316 regions.add(latestRegion);
317 }
318 return regions.toArray(new HRegionInfo[regions.size()]);
319 }
320
321 @Override
322 protected void updateMeta(final byte [] oldRegion1,
323 final byte [] oldRegion2,
324 HRegion newRegion)
325 throws IOException {
326 byte[][] regionsToDelete = {oldRegion1, oldRegion2};
327 for (int r = 0; r < regionsToDelete.length; r++) {
328 if(Bytes.equals(regionsToDelete[r], latestRegion.getRegionName())) {
329 latestRegion = null;
330 }
331 Delete delete = new Delete(regionsToDelete[r]);
332 table.delete(delete);
333 if(LOG.isDebugEnabled()) {
334 LOG.debug("updated columns in row: " + Bytes.toStringBinary(regionsToDelete[r]));
335 }
336 }
337 newRegion.getRegionInfo().setOffline(true);
338
339 MetaTableAccessor.addRegionToMeta(table, newRegion.getRegionInfo());
340
341 if(LOG.isDebugEnabled()) {
342 LOG.debug("updated columns in row: "
343 + Bytes.toStringBinary(newRegion.getRegionInfo().getRegionName()));
344 }
345 }
346 }
347 }