001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.ByteArrayOutputStream; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.Iterator; 026import java.util.List; 027import java.util.stream.Collectors; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.Cell.Type; 030import org.apache.hadoop.hbase.CellBuilderFactory; 031import org.apache.hadoop.hbase.CellBuilderType; 032import org.apache.hadoop.hbase.ClientMetaTableAccessor; 033import org.apache.hadoop.hbase.ClientMetaTableAccessor.QueryType; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.MetaTableAccessor; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.ResultScanner; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 046import org.apache.hadoop.hbase.master.RegionState; 047import org.apache.hadoop.hbase.master.RegionState.State; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.Pair; 050import org.apache.yetus.audience.InterfaceAudience; 051 052/** 053 * Helper class for storing replication barriers in family 'rep_barrier' of meta table. 054 * <p/> 055 * See SerialReplicationChecker on how to make use of the barriers. 056 */ 057@InterfaceAudience.Private 058public final class ReplicationBarrierFamilyFormat { 059 060 public static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent"); 061 062 private static final byte ESCAPE_BYTE = (byte) 0xFF; 063 064 private static final byte SEPARATED_BYTE = 0x00; 065 066 private ReplicationBarrierFamilyFormat() { 067 } 068 069 public static void addReplicationBarrier(Put put, long openSeqNum) throws IOException { 070 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow()) 071 .setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(HConstants.SEQNUM_QUALIFIER) 072 .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(Bytes.toBytes(openSeqNum)) 073 .build()); 074 } 075 076 private static void writeRegionName(ByteArrayOutputStream out, byte[] regionName) { 077 for (byte b : regionName) { 078 if (b == ESCAPE_BYTE) { 079 out.write(ESCAPE_BYTE); 080 } 081 out.write(b); 082 } 083 } 084 085 public static byte[] getParentsBytes(List<RegionInfo> parents) { 086 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 087 Iterator<RegionInfo> iter = parents.iterator(); 088 writeRegionName(bos, iter.next().getRegionName()); 089 while (iter.hasNext()) { 090 bos.write(ESCAPE_BYTE); 091 bos.write(SEPARATED_BYTE); 092 writeRegionName(bos, iter.next().getRegionName()); 093 } 094 return bos.toByteArray(); 095 } 096 097 private static List<byte[]> parseParentsBytes(byte[] bytes) { 098 List<byte[]> parents = new ArrayList<>(); 099 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 100 for (int i = 0; i < bytes.length; i++) { 101 if (bytes[i] == ESCAPE_BYTE) { 102 i++; 103 if (bytes[i] == SEPARATED_BYTE) { 104 parents.add(bos.toByteArray()); 105 bos.reset(); 106 continue; 107 } 108 // fall through to append the byte 109 } 110 bos.write(bytes[i]); 111 } 112 if (bos.size() > 0) { 113 parents.add(bos.toByteArray()); 114 } 115 return parents; 116 } 117 118 public static void addReplicationParent(Put put, List<RegionInfo> parents) throws IOException { 119 byte[] value = getParentsBytes(parents); 120 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow()) 121 .setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER) 122 .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(value).build()); 123 } 124 125 public static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts) 126 throws IOException { 127 Put put = new Put(regionInfo.getRegionName(), ts); 128 addReplicationBarrier(put, openSeqNum); 129 return put; 130 } 131 132 public static final class ReplicationBarrierResult { 133 private final long[] barriers; 134 private final RegionState.State state; 135 private final List<byte[]> parentRegionNames; 136 137 ReplicationBarrierResult(long[] barriers, State state, List<byte[]> parentRegionNames) { 138 this.barriers = barriers; 139 this.state = state; 140 this.parentRegionNames = parentRegionNames; 141 } 142 143 public long[] getBarriers() { 144 return barriers; 145 } 146 147 public RegionState.State getState() { 148 return state; 149 } 150 151 public List<byte[]> getParentRegionNames() { 152 return parentRegionNames; 153 } 154 155 @Override 156 public String toString() { 157 return "ReplicationBarrierResult [barriers=" + Arrays.toString(barriers) + ", state=" + state 158 + ", parentRegionNames=" 159 + parentRegionNames.stream().map(Bytes::toStringBinary).collect(Collectors.joining(", ")) 160 + "]"; 161 } 162 } 163 164 private static long getReplicationBarrier(Cell c) { 165 return Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()); 166 } 167 168 public static long[] getReplicationBarriers(Result result) { 169 return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER) 170 .stream().mapToLong(ReplicationBarrierFamilyFormat::getReplicationBarrier).sorted().distinct() 171 .toArray(); 172 } 173 174 private static ReplicationBarrierResult getReplicationBarrierResult(Result result) { 175 long[] barriers = getReplicationBarriers(result); 176 byte[] stateBytes = result.getValue(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER); 177 RegionState.State state = 178 stateBytes != null ? RegionState.State.valueOf(Bytes.toString(stateBytes)) : null; 179 byte[] parentRegionsBytes = 180 result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, REPLICATION_PARENT_QUALIFIER); 181 List<byte[]> parentRegionNames = 182 parentRegionsBytes != null ? parseParentsBytes(parentRegionsBytes) : Collections.emptyList(); 183 return new ReplicationBarrierResult(barriers, state, parentRegionNames); 184 } 185 186 public static ReplicationBarrierResult getReplicationBarrierResult(Connection conn, 187 TableName tableName, byte[] row, byte[] encodedRegionName) throws IOException { 188 byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false); 189 byte[] metaStopKey = 190 RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); 191 Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey) 192 .addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER) 193 .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions().setReversed(true) 194 .setCaching(10); 195 try (Table table = conn.getTable(TableName.META_TABLE_NAME); 196 ResultScanner scanner = table.getScanner(scan)) { 197 for (Result result;;) { 198 result = scanner.next(); 199 if (result == null) { 200 return new ReplicationBarrierResult(new long[0], null, Collections.emptyList()); 201 } 202 byte[] regionName = result.getRow(); 203 // TODO: we may look up a region which has already been split or merged so we need to check 204 // whether the encoded name matches. Need to find a way to quit earlier when there is no 205 // record for the given region, for now it will scan to the end of the table. 206 if ( 207 !Bytes.equals(encodedRegionName, Bytes.toBytes(RegionInfo.encodeRegionName(regionName))) 208 ) { 209 continue; 210 } 211 return getReplicationBarrierResult(result); 212 } 213 } 214 } 215 216 public static long[] getReplicationBarriers(Connection conn, byte[] regionName) 217 throws IOException { 218 try (Table table = conn.getTable(TableName.META_TABLE_NAME)) { 219 Result result = table.get(new Get(regionName) 220 .addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER) 221 .readAllVersions()); 222 return getReplicationBarriers(result); 223 } 224 } 225 226 public static List<Pair<String, Long>> getTableEncodedRegionNameAndLastBarrier(Connection conn, 227 TableName tableName) throws IOException { 228 List<Pair<String, Long>> list = new ArrayList<>(); 229 MetaTableAccessor.scanMeta(conn, 230 ClientMetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REPLICATION), 231 ClientMetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REPLICATION), 232 QueryType.REPLICATION, r -> { 233 byte[] value = 234 r.getValue(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER); 235 if (value == null) { 236 return true; 237 } 238 long lastBarrier = Bytes.toLong(value); 239 String encodedRegionName = RegionInfo.encodeRegionName(r.getRow()); 240 list.add(Pair.newPair(encodedRegionName, lastBarrier)); 241 return true; 242 }); 243 return list; 244 } 245 246 public static List<String> getTableEncodedRegionNamesForSerialReplication(Connection conn, 247 TableName tableName) throws IOException { 248 List<String> list = new ArrayList<>(); 249 MetaTableAccessor.scanMeta(conn, 250 ClientMetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REPLICATION), 251 ClientMetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REPLICATION), 252 QueryType.REPLICATION, new FirstKeyOnlyFilter(), Integer.MAX_VALUE, r -> { 253 list.add(RegionInfo.encodeRegionName(r.getRow())); 254 return true; 255 }); 256 return list; 257 } 258}