首页 > 网络 > 云计算 >

phoenixjdbcdriver查询源码分析

2016-08-04

phoenixjdbcdriver查询源码分析。首先 phoenix 支持jbbc协议,则就要按装jdbc的协议接口进行处理。

源码分析">phoenix driver查询源码分析

首先 phoenix 支持jbbc协议,则就要按装jdbc的协议接口进行处理。
首先要实现driver PhoenixDriver,这个driver负责连接的创建,和连接到目标
服务器端的链接创建。
在该类中有一个创建链接的方法

protected final Connection createConnection(String url, Properties info) throws SQLException {
  Properties augmentedInfo = PropertiesUtil.deepCopy(info);
  augmentedInfo.putAll(getDefaultProps().asMap());
  ConnectionQueryServices connectionServices = getConnectionQueryServices(url, augmentedInfo);
  PhoenixConnection connection = connectionServices.connect(url, augmentedInfo);
  return connection;
}

然后拿到链接对象进行创建链接

@Override
protected ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
    try {
        lockInterruptibly(LockMode.READ);
        checkClosed();
        ConnectionInfo connInfo = ConnectionInfo.create(url);
        QueryServices services = getQueryServices();
        ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps());
        ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(normalizedConnInfo);
        if (connectionQueryServices == null) {
            if (normalizedConnInfo.isConnectionless()) {
                connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo, info);
            } else {
                connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo, info);
            }
            ConnectionQueryServices prevValue = connectionQueryServicesMap.putIfAbsent(normalizedConnInfo, connectionQueryServices);
            if (prevValue != null) {
                connectionQueryServices = prevValue;
            }
        }

在 ConnectionInfo connInfo = ConnectionInfo.create(url);这个方法当中,解释url中的zk地址,
然后创建链接查询对象

connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo, info);

在 ConnectionQueryServicesImpl 开始进行 init(final String url, final Properties props) 初始化
首先打开链接

private void openConnection() throws SQLException {
    try {
        // check if we need to authenticate with kerberos
        String clientKeytab = this.getProps().get(HBASE_CLIENT_KEYTAB);
        String clientPrincipal = this.getProps().get(HBASE_CLIENT_PRINCIPAL);
        if (clientKeytab != null && clientPrincipal != null) {
            logger.info("Trying to connect to a secure cluster with keytab:" + clientKeytab);
            UserGroupInformation.setConfiguration(config);
            User.login(config, HBASE_CLIENT_KEYTAB, HBASE_CLIENT_PRINCIPAL, null);
            logger.info("Successfull login to secure cluster!!");
        }
        boolean transactionsEnabled = props.getBoolean(
                QueryServices.TRANSACTIONS_ENABLED,
                QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
        // only initialize the tx service client if needed
        if (transactionsEnabled) {
            initTxServiceClient();
        }
        this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
    } catch (IOException e) {
        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
            .setRootCause(e).build().buildException();
    }
    if (this.connection.isClosed()) { // TODO: why the heck doesn't this throw above?
        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION).build().buildException();
    }
}

可以看到,打开的connection 是 org.apache.hadoop.hbase.client.HConnection; 是创建到hbase的链接了
接着包装了一下链接对象

 metaConnection = new PhoenixConnection(
                                ConnectionQueryServicesImpl.this, globalUrl, scnProps, newEmptyMetaData());

可以看下该对象的初始化,phoenix里面的很多特性都是在这里可以看到缩影

public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade) throws SQLException {
this.url = url;
this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade;
// Copy so client cannot change
this.info = info == null ? new Properties() : PropertiesUtil.deepCopy(info);
final PName tenantId = JDBCUtil.getTenantId(url, info);
if (this.info.isEmpty() && tenantId == null) {
this.services = services;
} else {
// Create child services keyed by tenantId to track resource usage for
// a tenantId for all connections on this JVM.
if (tenantId != null) {
services = services.getChildQueryServices(tenantId.getBytesPtr());
}
ReadOnlyProps currentProps = services.getProps();
final ReadOnlyProps augmentedProps = currentProps.addAll(filterKnownNonProperties(this.info));
this.services = augmentedProps == currentProps ? services : new DelegateConnectionQueryServices(services) {
@Override
public ReadOnlyProps getProps() {
return augmentedProps;
}
};
}

Long scnParam = JDBCUtil.getCurrentSCN(url, this.info);
checkScn(scnParam);
this.scn = scnParam;
this.isAutoFlush = this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)
&& this.services.getProps().getBoolean(QueryServices.AUTO_FLUSH_ATTRIB, QueryServicesOptions.DEFAULT_AUTO_FLUSH) ;
this.isAutoCommit = JDBCUtil.getAutoCommit(
url, this.info,
this.services.getProps().getBoolean(
QueryServices.AUTO_COMMIT_ATTRIB,
QueryServicesOptions.DEFAULT_AUTO_COMMIT));
this.consistency = JDBCUtil.getConsistencyLevel(url, this.info, this.services.getProps()
 .get(QueryServices.CONSISTENCY_ATTRIB,
 QueryServicesOptions.DEFAULT_CONSISTENCY_LEVEL));
this.tenantId = tenantId;
this.mutateBatchSize = JDBCUtil.getMutateBatchSize(url, this.info, this.services.getProps());
datePattern = this.services.getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
timePattern = this.services.getProps().get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT);
timestampPattern = this.services.getProps().get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
String numberPattern = this.services.getProps().get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT);
int maxSize = this.services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
Format dateFormat = DateUtil.getDateFormatter(datePattern);
Format timeFormat = DateUtil.getDateFormatter(timePattern);
Format timestampFormat = DateUtil.getDateFormatter(timestampPattern);
formatters.put(PDate.INSTANCE, dateFormat);
formatters.put(PTime.INSTANCE, timeFormat);
formatters.put(PTimestamp.INSTANCE, timestampFormat);
formatters.put(PUnsignedDate.INSTANCE, dateFormat);
formatters.put(PUnsignedTime.INSTANCE, timeFormat);
formatters.put(PUnsignedTimestamp.INSTANCE, timestampFormat);
formatters.put(PDecimal.INSTANCE, FunctionArgumentType.NUMERIC.getFormatter(numberPattern));
// We do not limit the metaData on a connection less than the global one,
// as there's not much that will be cached here.
Pruner pruner = new Pruner() {

@Override
public boolean prune(PTable table) {
long maxTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
return (table.getType() != PTableType.SYSTEM && 
(  table.getTimeStamp() >= maxTimestamp || 
 ! Objects.equal(tenantId, table.getTenantId())) );
}

@Override
public boolean prune(PFunction function) {
long maxTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
return ( function.getTimeStamp() >= maxTimestamp ||
 ! Objects.equal(tenantId, function.getTenantId()));
}
};
this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, this.services.getProps());
this.mutationState = mutationState == null ? newMutationState(maxSize) : new MutationState(mutationState);
this.metaData = metaData.pruneTables(pruner);
this.metaData = metaData.pruneFunctions(pruner);
this.services.addConnection(this);

// setup tracing, if its enabled
this.sampler = Tracing.getConfiguredSampler(this);
this.customTracingAnnotations = getImmutableCustomTracingAnnotations();
this.scannerQueue = new LinkedBlockingQueue<>();
this.tableResultIteratorFactory = new DefaultTableResultIteratorFactory();
GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
}                                   

在上面的代码中可以到 tenantId (承租)这涉及到使用人员的资源分配的问题.
scnParam 属性,涉及到当时hbase的时间戳的问题
.consistency 属性,涉及到数据的一致性问题
.所以其它phoenix对hbase的查询就是在hbase的connection上面包装了一层的代理,增量自己的特性。
当链接创建成功后,就要开始初始化phoenix的元数据表。在类 ConnectionQueryServicesImpl.init 的方法中
进行相关的元数据表的初始化,判断相关的元数据表是否已经创建,如果没有创建就进行创建

  metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
CREATE_TABLE_METADATA =  CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + 

上面就是对system.catalog的目录表的创建。
首先创建 statement,就是下面的对象

 PhoenixStatement statement = new PhoenixStatement(this);   

      @Override
public int executeUpdate(String sql) throws SQLException {
CompilableStatement stmt = parseStatement(sql);
if (!stmt.getOperation().isMutation) {
throw new ExecuteUpdateNotApplicableException(sql);
}
if (!batch.isEmpty()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
.build().buildException();
}
int updateCount = executeMutation(stmt);
flushIfNecessary();
return updateCount;
}

然后进行sql的编译

  parser = new PhoenixStatementParser(sql, new ExecutableNodeFactory());    

创建了上面的对象对sql进行了解释,接着到 executeMutation方法中进行执行

 new CallRunner.CallableThrowable() {
@Override
public Integer call() throws SQLException {
try {
MutationState state = connection.getMutationState();
                                //这里进行编译成执行计划
MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null && plan.getTargetRef().getTable().isTransactional()) {
state.startTransaction();
}
Iterator tableRefs = plan.getSourceRefs().iterator();
state.sendUncommitted(tableRefs);
state.checkpointIfNeccessary(plan);
MutationState lastState = plan.execute();
state.join(lastState);
if (connection.getAutoCommit()) {
connection.commit();;
}
setLastResultSet(null);
setLastQueryPlan(null);
// Unfortunately, JDBC uses an int for update count, so we
// just max out at Integer.MAX_VALUE
int lastUpdateCount = (int) Math.min(Integer.MAX_VALUE, lastState.getUpdateCount());
setLastUpdateCount(lastUpdateCount);
setLastUpdateOperation(stmt.getOperation());
connection.incrementStatementExecutionCounter();
return lastUpdateCount;

可以分析的到之前的stmt是 ExecutableCreateTableStatement 对象的

    public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
        CreateTableCompiler compiler = new CreateTableCompiler(stmt, this.getOperation());
        return compiler.compile(this);
    }

然后创建了执行计划

   public MutationPlan compile(final CreateTableStatement create) throws SQLException {
final PhoenixConnection connection = statement.getConnection();
ColumnResolver resolver = FromCompiler.getResolverForCreation(create, connection);
PTableType type = create.getTableType();
PhoenixConnection connectionToBe = connection;
PTable parentToBe = null;
ViewType viewTypeToBe = null;
Scan scan = new Scan();
final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
// TODO: support any statement for a VIEW instead of just a WHERE clause
ParseNode whereNode = create.getWhereClause();
String viewStatementToBe = null;
byte[][] viewColumnConstantsToBe = null;
BitSet isViewColumnReferencedToBe = null;
if (type == PTableType.VIEW) {
TableRef tableRef = resolver.getTables().get(0);
int nColumns = tableRef.getTable().getColumns().size();
isViewColumnReferencedToBe = new BitSet(nColumns);
// Used to track column references in a view
ExpressionCompiler expressionCompiler = new ColumnTrackingExpressionCompiler(context, isViewColumnReferencedToBe);
parentToBe = tableRef.getTable();
viewTypeToBe = parentToBe.getViewType() == ViewType.MAPPED ? ViewType.MAPPED : ViewType.UPDATABLE;
if (whereNode == null) {
viewStatementToBe = parentToBe.getViewStatement();
} else {
whereNode = StatementNormalizer.normalize(whereNode, resolver);
if (whereNode.isStateless()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_WHERE_IS_CONSTANT)
.build().buildException();
}
// If our parent has a VIEW statement, combine it with this one
if (parentToBe.getViewStatement() != null) {
SelectStatement select = new SQLParser(parentToBe.getViewStatement()).parseQuery().combine(whereNode);
whereNode = select.getWhere();
}
Expression where = whereNode.accept(expressionCompiler);
if (where != null && !LiteralExpression.isTrue(where)) {
TableName baseTableName = create.getBaseTableName();
StringBuilder buf = new StringBuilder();
whereNode.toSQL(resolver, buf);
viewStatementToBe = QueryUtil.getViewStatement(baseTableName.getSchemaName(), baseTableName.getTableName(), buf.toString());
}
if (viewTypeToBe != ViewType.MAPPED) {
Long scn = connection.getSCN();
connectionToBe = (scn != null || tableRef.getTable().isTransactional()) ? connection :
// If we haved no SCN on our connection and the base table is not transactional, freeze the SCN at when
// the base table was resolved to prevent any race condition on
// the error checking we do for the base table. The only potential
// issue is if the base table lives on a different region server
// than the new table will, then we&#39;re relying here on the system
// clocks being in sync.
new PhoenixConnection(
// When the new table is created, we still want to cache it
// on our connection.
new DelegateConnectionQueryServices(connection.getQueryServices()) {
@Override
public PMetaData addTable(PTable table, long resolvedTime) throws SQLException {
return connection.addTable(table, resolvedTime);
}
},
connection, tableRef.getTimeStamp());
viewColumnConstantsToBe = new byte[nColumns][];
ViewWhereExpressionVisitor visitor = new ViewWhereExpressionVisitor(parentToBe, viewColumnConstantsToBe);
where.accept(visitor);
// If view is not updatable, viewColumnConstants should be empty. We will still
// inherit our parent viewConstants, but we have no additional ones.
viewTypeToBe = visitor.isUpdatable() ? ViewType.UPDATABLE : ViewType.READ_ONLY;
if (viewTypeToBe != ViewType.UPDATABLE) {
viewColumnConstantsToBe = null;
}
}
}
}
final ViewType viewType = viewTypeToBe;
final String viewStatement = viewStatementToBe;
final byte[][] viewColumnConstants = viewColumnConstantsToBe;
final BitSet isViewColumnReferenced = isViewColumnReferencedToBe;
List splitNodes = create.getSplitNodes();
final byte[][] splits = new byte[splitNodes.size()][];
ImmutableBytesWritable ptr = context.getTempPtr();
ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
for (int i = 0; i < splits.length; i++) {
ParseNode node = splitNodes.get(i);
if (node instanceof BindParseNode) {
context.getBindManager().addParamMetaData((BindParseNode) node, VARBINARY_DATUM);
}
if (node.isStateless()) {
Expression expression = node.accept(expressionCompiler);
if (expression.evaluate(null, ptr)) {;
splits[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
continue;
}
}
throw new SQLExceptionInfo.Builder(SQLExceptionCode.SPLIT_POINT_NOT_CONSTANT)
.setMessage("Node: " + node).build().buildException();
}
final MetaDataClient client = new MetaDataClient(connectionToBe);
final PTable parent = parentToBe;

return new BaseMutationPlan(context, operation) {

@Override
public MutationState execute() throws SQLException {
try {
                //这里创建表了
return client.createTable(create, splits, parent, viewStatement, viewType, viewColumnConstants, isViewColumnReferenced);
} finally {
if (client.getConnection() != connection) {
client.getConnection().close();
}
}
}

@Override
public ExplainPlan getExplainPlan() throws SQLException {
return new ExplainPlan(Collections.singletonList("CREATE TABLE"));
}
};
}

可以看到上面创建的对象

final MetaDataClient client = new MetaDataClient(connectionToBe);

链接到hbase了,然后进行了MetaDataClient.createTable

public MutationState createTable(CreateTableStatement statement, byte[][] splits, PTable parent, String viewStatement, ViewType viewType, byte[][] viewColumnConstants, BitSet isViewColumnReferenced) throws SQLException {
    PTable table = createTableInternal(statement, splits, parent, viewStatement, viewType, viewColumnConstants, isViewColumnReferenced, null, null, null);
    if (table == null || table.getType() == PTableType.VIEW || table.isTransactional()) {
        return new MutationState(0,connection);
    }
    // Hack to get around the case when an SCN is specified on the connection.
    // In this case, we won&#39;t see the table we just created yet, so we hack
    // around it by forcing the compiler to not resolve anything.
    PostDDLCompiler compiler = new PostDDLCompiler(connection);
    //connection.setAutoCommit(true);
    // Execute any necessary data updates
    Long scn = connection.getSCN();
    long ts = (scn == null ? table.getTimeStamp() : scn);
    // Getting the schema through the current connection doesn&#39;t work when the connection has an scn specified
    // Since the table won&#39;t be added to the current connection.
    TableRef tableRef = new TableRef(null, table, ts, false);
    byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
    MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), emptyCF, null, null, tableRef.getTimeStamp());
    return connection.getQueryServices().updateData(plan);
}

在MetaDataClient.createTableInternal方法体中,要根据创建的不同类型的表,是hbase表还是索引对象还是序列对象,插入相关的元数据表到不同的对象中

  PreparedStatement tableUpsert = connection.prepareStatement(CREATE_TABLE);
tableUpsert.setString(1, tenantIdStr);
tableUpsert.setString(2, schemaName);
tableUpsert.setString(3, tableName);
tableUpsert.setString(4, tableType.getSerializedValue());
tableUpsert.setLong(5, PTable.INITIAL_SEQ_NUM);
tableUpsert.setInt(6, position);
if (saltBucketNum != null) {
tableUpsert.setInt(7, saltBucketNum);
} else {
tableUpsert.setNull(7, Types.INTEGER);
}
tableUpsert.setString(8, pkName);
tableUpsert.setString(9, dataTableName);
tableUpsert.setString(10, indexState == null ? null : indexState.getSerializedValue());
tableUpsert.setBoolean(11, isImmutableRows);
tableUpsert.setString(12, defaultFamilyName);
tableUpsert.setString(13, viewStatement);
tableUpsert.setBoolean(14, disableWAL);
tableUpsert.setBoolean(15, multiTenant);
if (viewType == null) {
tableUpsert.setNull(16, Types.TINYINT);
} else {
tableUpsert.setByte(16, viewType.getSerializedValue());
}
if (indexId == null) {
tableUpsert.setNull(17, Types.SMALLINT);
} else {
tableUpsert.setShort(17, indexId);
}
if (indexType == null) {
tableUpsert.setNull(18, Types.TINYINT);
} else {
tableUpsert.setByte(18, indexType.getSerializedValue());
}
tableUpsert.setBoolean(19, storeNulls);
if (parent != null && tableType == PTableType.VIEW) {
tableUpsert.setInt(20, parent.getColumns().size());
} else {
tableUpsert.setInt(20, BASE_TABLE_BASE_COLUMN_COUNT);
}
tableUpsert.setBoolean(21, transactional);
tableUpsert.setLong(22, updateCacheFrequency);
tableUpsert.execute();

CREATE_TABLE =
"UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE 

如该方法,就是插入到目录表中,进行元数据的保存.
可以到 select * from SYSTEM.CATALOG 查到各个表的元数据了。
当在元数据表中插入完成后,就调用如下的方法,真正到hbase数据库中进行创建hbase表

 MetaDataMutationResult result = connection.getQueryServices().createTable(
tableMetaData,
viewType == ViewType.MAPPED || indexId != null ? physicalNames.get(0).getBytes() : null,
tableType, tableProps, familyPropList, splits);

开始创建物理表

if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && physicalTableName == null)) {
        // For views this will ensure that metadata already exists
        // For tables and indexes, this will create the metadata if it doesn&#39;t already exist
        ensureTableCreated(tableName, tableType, tableProps, families, splits, true);
    }                   

在ConnectionQueryServicesImpl.ensureTableCreated方法中

 HTableDescriptor newDesc = generateTableDescriptor(tableName, existingDesc, tableType , props, families, splits);

添加协处理器

 private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType, Map tableProps) throws SQLException {
    // The phoenix jar must be available on HBase classpath
    int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
    try {
        if (!descriptor.hasCoprocessor(ScanRegionObserver.class.getName())) {
            descriptor.addCoprocessor(ScanRegionObserver.class.getName(), null, priority, null);
        }
        if (!descriptor.hasCoprocessor(UngroupedAggregateRegionObserver.class.getName())) {
            descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(), null, priority, null);
        }
        if (!descriptor.hasCoprocessor(GroupedAggregateRegionObserver.class.getName())) {
            descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(), null, priority, null);
        }
        if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
            descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null);
        }
        boolean isTransactional = 
                Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) ||
                Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); // For ALTER TABLE
        // TODO: better encapsulation for this
        // Since indexes can&#39;t have indexes, don&#39;t install our indexing coprocessor for indexes.
        // Also don&#39;t install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use
        // all-or-none mutate class which break when this coprocessor is installed (PHOENIX-1318).
        if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW)
                && !SchemaUtil.isMetaTable(tableName)
                && !SchemaUtil.isStatsTable(tableName)) {
            if (isTransactional) {
                if (!descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
                    descriptor.addCoprocessor(PhoenixTransactionalIndexer.class.getName(), null, priority, null);
                }
                // For alter table, remove non transactional index coprocessor
                if (descriptor.hasCoprocessor(Indexer.class.getName())) {
                    descriptor.removeCoprocessor(Indexer.class.getName());
                }
            } else {
                if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
                    // If exception on alter table to transition back to non transactional
                    if (descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
                        descriptor.removeCoprocessor(PhoenixTransactionalIndexer.class.getName());
                    }
                    Map opts = Maps.newHashMapWithExpectedSize(1);
                    opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
                    Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
                }
            }
        }
        if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
            descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),
                    null, priority, null);
        }

        if (descriptor.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null
                && Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(descriptor
                        .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
            if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) {
                descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(),
                    null, priority, null);
            }
        } else {
            if (!descriptor.hasCoprocessor(LocalIndexSplitter.class.getName())
                    && !SchemaUtil.isMetaTable(tableName)
                    && !SchemaUtil.isSequenceTable(tableName)) {
                descriptor.addCoprocessor(LocalIndexSplitter.class.getName(), null, priority, null);
            }
        }

        // Setup split policy on Phoenix metadata table to ensure that the key values of a Phoenix table
        // stay on the same region.
        if (SchemaUtil.isMetaTable(tableName) || SchemaUtil.isFunctionTable(tableName)) {
            if (!descriptor.hasCoprocessor(MetaDataEndpointImpl.class.getName())) {
                descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, priority, null);
            }
            if(SchemaUtil.isMetaTable(tableName) ) {
                if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) {
                    descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, priority + 1, null);
                }
            }
        } else if (SchemaUtil.isSequenceTable(tableName)) {
            if (!descriptor.hasCoprocessor(SequenceRegionObserver.class.getName())) {
                descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null);
            }
        }

        if (isTransactional) {
            if (!descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) {
                descriptor.addCoprocessor(PhoenixTransactionalProcessor.class.getName(), null, priority - 10, null);
            }
        } else {
            // If exception on alter table to transition back to non transactional
            if (descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) {
                descriptor.removeCoprocessor(PhoenixTransactionalProcessor.class.getName());
            }                
        }
    } catch (IOException e) {
        throw ServerUtil.parseServerException(e);
    }
}

从上面看,添加了一系列的协处理器了,phoenix的核心就是使用协处理器的功能进行数据的处理,可以看到上面根据是否是
本地索引表,是否具有事务,进行不同的协处理器的添加

然后就进行hbase表的创建

if (splits == null) {
    admin.createTable(newDesc);
} else {
    admin.createTable(newDesc, splits);
}

当上面真正的表创建完成后,接下来要根据它是否是视图还是物理表,进行索引表的创建

    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
    if (tableType == PTableType.INDEX) { // Index on view
        // Physical index table created up front for multi tenant
        // TODO: if viewIndexId is Short.MIN_VALUE, then we don&#39;t need to attempt to create it
        if (physicalTableName != null) {
            if (localIndexTable) {
                ensureLocalIndexTableCreated(tableName, tableProps, families, splits, MetaDataUtil.getClientTimeStamp(m));
            } else if (!MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) {
                ensureViewIndexTableCreated(tenantIdBytes.length == 0 ? null : PNameFactory.newName(tenantIdBytes), physicalTableName, MetaDataUtil.getClientTimeStamp(m));
            }
        }
    } else if (tableType == PTableType.TABLE && MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) { // Create view index table up front for multi tenant tables
        ptr.set(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
        MetaDataUtil.getMutationValue(m, PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES, kvBuilder, ptr);
        List>> familiesPlusDefault = null;
        for (Pair> family : families) {
            byte[] cf = family.getFirst();
            if (Bytes.compareTo(cf, 0, cf.length, ptr.get(), ptr.getOffset(),ptr.getLength()) == 0) {
                familiesPlusDefault = families;
                break;
            }
        }
        // Don&#39;t override if default family already present
        if (familiesPlusDefault == null) {
            byte[] defaultCF = ByteUtil.copyKeyBytesIfNecessary(ptr);
            // Only use splits if table is salted, otherwise it may not be applicable
            // Always add default column family, as we don&#39;t know in advance if we&#39;ll need it
            familiesPlusDefault = Lists.newArrayList(families);
            familiesPlusDefault.add(new Pair>(defaultCF,Collections.emptyMap()));
        }
        ensureViewIndexTableCreated(tableName, tableProps, familiesPlusDefault, MetaDataUtil.isSalted(m, kvBuilder, ptr) ? splits : null, MetaDataUtil.getClientTimeStamp(m));
    }                   

可以看到上面如果是视图,并且是本地索引表(这个是phoenix的特征,索引表的region是和数据的region是在同一个regionserver实例中的的)
则调用ensureLocalIndexTableCreated 方法进行本地索引表的创建,到最后低层还是调用到
ConnectionQueryServicesImpl.ensureTableCreated方法中进行表的直接创建过程,所以这个是一个核心方法
里面会根据不同的表类型,不同的属性,添加不同的协处理器
当直接要创建的表和涉及到索引表创建完成后,接下来会调用到system.catalog的表的远程rpc协处理器的调用
进行远程的判断,相关的表是否已经创建完成,是否存放在元数据表当中

    byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
    MetaDataMutationResult result = metaDataCoprocessorExec(tableKey,
        new Batch.Call() {
        @Override
                public MetaDataResponse call(MetaDataService instance) throws IOException {
                    ServerRpcController controller = new ServerRpcController();
                    BlockingRpcCallback rpcCallback =
                            new BlockingRpcCallback();
                    CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
                    for (Mutation m : tableMetaData) {
                        MutationProto mp = ProtobufUtil.toProto(m);
                        builder.addTableMetadataMutations(mp.toByteString());
                    }
                    builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                    CreateTableRequest build = builder.build();
                    instance.createTable(controller, build, rpcCallback);
                    if(controller.getFailedOn() != null) {
                        throw controller.getFailedOn();
                    }
                    return rpcCallback.get();
                }
    });

metaDataCoprocessorExec(tableKey, callable, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);

所以调用流程是比较严谨的,防止相关的元数据表没有完全正确的创建。
调用到远程的是 MetaDataEndpointImpl.createTable

   @Override
public void createTable(RpcController controller, CreateTableRequest request,
RpcCallback done) {
MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
byte[][] rowKeyMetaData = new byte[3][];
byte[] schemaName = null;
byte[] tableName = null;
try {
List tableMetadata = ProtobufUtil.getMutations(request);
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];

byte[] parentSchemaName = null;
byte[] parentTableName = null;
PTableType tableType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());
byte[] parentTableKey = null;
Mutation viewPhysicalTableRow = null;
if (tableType == PTableType.VIEW) {
byte[][] parentSchemaTableNames = new byte[2][];
/*
 * For a view, we lock the base physical table row. For a mapped view, there is 
 * no link present to the physical table. So the viewPhysicalTableRow is null
 * in that case.
 */
viewPhysicalTableRow = getPhysicalTableForView(tableMetadata, parentSchemaTableNames);
parentSchemaName = parentSchemaTableNames[0];
parentTableName = parentSchemaTableNames[1];
if (parentTableName != null) {
parentTableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, parentSchemaName, parentTableName);
}
} else if (tableType == PTableType.INDEX) {
parentSchemaName = schemaName;
/* 
 * For an index we lock the parent table&#39;s row which could be a physical table or a view.
 * If the parent table is a physical table, then the tenantIdBytes is empty because
 * we allow creating an index with a tenant connection only if the parent table is a view.
 */ 
parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
parentTableKey = SchemaUtil.getTableKey(tenantIdBytes, parentSchemaName, parentTableName);
}

Region region = env.getRegion();
List locks = Lists.newArrayList();
// Place a lock using key for the table to be created
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
try {
acquireLock(region, tableKey, locks);

// If the table key resides outside the region, return without doing anything
MetaDataMutationResult result = checkTableKeyInRegion(tableKey, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
return;
}

long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
ImmutableBytesPtr parentCacheKey = null;
if (parentTableName != null) {
// Check if the parent table resides in the same region. If not, don&#39;t worry about locking the parent table row
// or loading the parent table. For a view, the parent table that needs to be locked is the base physical table.
// For an index on view, the view header row needs to be locked. 
result = checkTableKeyInRegion(parentTableKey, region);
if (result == null) {
acquireLock(region, parentTableKey, locks);
parentCacheKey = new ImmutableBytesPtr(parentTableKey);
PTable parentTable = loadTable(env, parentTableKey, parentCacheKey, clientTimeStamp,
clientTimeStamp);
if (parentTable == null || isTableDeleted(parentTable)) {
builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
long parentTableSeqNumber;
if (tableType == PTableType.VIEW && viewPhysicalTableRow != null && request.hasClientVersion()) {
// Starting 4.5, the client passes the sequence number of the physical table in the table metadata.
parentTableSeqNumber = MetaDataUtil.getSequenceNumber(viewPhysicalTableRow);
} else if (tableType == PTableType.VIEW && !request.hasClientVersion()) {
// Before 4.5, due to a bug, the parent table key wasn&#39;t available.
// So don&#39;t do anything and prevent the exception from being thrown.
parentTableSeqNumber = parentTable.getSequenceNumber();
} else {
parentTableSeqNumber = MetaDataUtil.getParentSequenceNumber(tableMetadata);
}
// If parent table isn&#39;t at the expected sequence number, then return
if (parentTable.getSequenceNumber() != parentTableSeqNumber) {
builder.setReturnCode(MetaDataProtos.MutationCode.CONCURRENT_TABLE_MUTATION);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.setTable(PTableImpl.toProto(parentTable));
done.run(builder.build());
return;
}
}
}
// Load child table next
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
// Get as of latest timestamp so we can detect if we have a newer table that already
// exists without making an additional query
PTable table =
loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
if (table != null) {
if (table.getTimeStamp() < clientTimeStamp) {
// If the table is older than the client time stamp and it&#39;s deleted,
// continue
if (!isTableDeleted(table)) {
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.setTable(PTableImpl.toProto(table));
done.run(builder.build());
return;
}
} else {
builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_TABLE_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.setTable(PTableImpl.toProto(table));
done.run(builder.build());
return;
}
}
// Add cell for ROW_KEY_ORDER_OPTIMIZABLE = true, as we know that new tables
// conform the correct row key. The exception is for a VIEW, which the client
// sends over depending on its base physical table.
if (tableType != PTableType.VIEW) {
UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetadata, tableKey, clientTimeStamp);
}
// TODO: Switch this to HRegion#batchMutate when we want to support indexes on the
// system table. Basically, we get all the locks that we don&#39;t already hold for all the
// tableMetadata rows. This ensures we don&#39;t have deadlock situations (ensuring
// primary and then index table locks are held, in that order). For now, we just don&#39;t support
// indexing on the system table. This is an issue because of the way we manage batch mutation
// in the Indexer.
region.mutateRowsWithLocks(tableMetadata, Collections. emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);

// Invalidate the cache - the next getTable call will add it
// TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
Cache metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
if (parentCacheKey != null) {
metaDataCache.invalidate(parentCacheKey);
}
metaDataCache.invalidate(cacheKey);
// Get timeStamp from mutations - the above method sets it if it&#39;s unset
long currentTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
builder.setMutationTime(currentTimeStamp);
done.run(builder.build());
return;
} finally {
region.releaseRowLocks(locks);
}
} catch (Throwable t) {
logger.error("createTable failed", t);
ProtobufUtil.setControllerException(controller,
ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
}
}

上面就是远程的协处理器,里面相关的依赖表进行了逻辑判断,是否同名的父表已经存在,是否有其它表在并发创建了相同的表
等信息。

        // Hack to get around the case when an SCN is specified on the connection.
    // In this case, we won&#39;t see the table we just created yet, so we hack
    // around it by forcing the compiler to not resolve anything.
    PostDDLCompiler compiler = new PostDDLCompiler(connection);
    //connection.setAutoCommit(true);
    // Execute any necessary data updates
    Long scn = connection.getSCN();
    long ts = (scn == null ? table.getTimeStamp() : scn);
    // Getting the schema through the current connection doesn&#39;t work when the connection has an scn specified
    // Since the table won&#39;t be added to the current connection.
    TableRef tableRef = new TableRef(null, table, ts, false);
    byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
    MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), emptyCF, null, null, tableRef.getTimeStamp());
    return connection.getQueryServices().updateData(plan);

接下来,在上面的代码中,要执行相关的清理工作,如drop table column ,之类的,要把不是的字段进行清空,
还有更新一个phoenix的statics的数据统计表(这个是phoenix进行执行计划优化时用到的表的统计元数据信息)

 connection.setAutoCommit(true);
                SQLException sqlE = null;
                /*
                 * Handles:
                 * 1) deletion of all rows for a DROP TABLE and subsequently deletion of all rows for a DROP INDEX;
                 * 2) deletion of all column values for a ALTER TABLE DROP COLUMN
                 * 3) updating the necessary rows to have an empty KV
                 * 4) updating table stats
                 */
                long totalMutationCount = 0;
                for (final TableRef tableRef : tableRefs) {
                    Scan scan = ScanUtil.newScan(context.getScan());
                    SelectStatement select = SelectStatement.COUNT_ONE;
                    // We need to use this tableRef
                    ColumnResolver resolver = new ColumnResolver() {
                        @Override
                        public List getTables() {
                            return Collections.singletonList(tableRef);
                        }

                        @Override
                        public java.util.List getFunctions() {
                            return Collections.emptyList();
                        };

                        @Override
                        public TableRef resolveTable(String schemaName, String tableName)
                                throws SQLException {
                            throw new UnsupportedOperationException();
                        }
                        @Override
                        public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException {
                            PColumn column = tableName != null
                                    ? tableRef.getTable().getColumnFamily(tableName).getColumn(colName)
                                    : tableRef.getTable().getColumn(colName);
                            return new ColumnRef(tableRef, column.getPosition());
                        }

                        @Override
                        public PFunction resolveFunction(String functionName) throws SQLException {
                            throw new UnsupportedOperationException();
                        };

                        @Override
                        public boolean hasUDFs() {
                            return false;
                        };
                    };
                    PhoenixStatement statement = new PhoenixStatement(connection);
                    StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
                    long ts = timestamp;
                    // FIXME: DDL operations aren&#39;t transactional, so we&#39;re basing the timestamp on a server timestamp.
                    // Not sure what the fix should be. We don&#39;t need conflict detection nor filtering of invalid transactions
                    // in this case, so maybe this is ok.
                    if (ts!=HConstants.LATEST_TIMESTAMP && tableRef.getTable().isTransactional()) {
                        ts = TransactionUtil.convertToNanoseconds(ts);
                    }
                    ScanUtil.setTimeRange(scan, ts);
                    if (emptyCF != null) {
                        scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
                    }
                    ServerCache cache = null;
                    try {
                        if (deleteList != null) {
                            if (deleteList.isEmpty()) {
                                scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
                                // In the case of a row deletion, add index metadata so mutable secondary indexing works
                                /* TODO: we currently manually run a scan to delete the index data here
                                ImmutableBytesWritable ptr = context.getTempPtr();
                                tableRef.getTable().getIndexMaintainers(ptr);
                                if (ptr.getLength() > 0) {
                                    IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
                                    cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
                                    byte[] uuidValue = cache.getId();
                                    scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                }
                                */
                            } else {
                                // In the case of the empty key value column family changing, do not send the index
                                // metadata, as we&#39;re currently managing this from the client. It&#39;s possible for the
                                // data empty column family to stay the same, while the index empty column family
                                // changes.
                                PColumn column = deleteList.get(0);
                                if (emptyCF == null) {
                                    scan.addColumn(column.getFamilyName().getBytes(), column.getName().getBytes());
                                }
                                scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, column.getFamilyName().getBytes());
                                scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, column.getName().getBytes());
                            }
                        }
                        List columnFamilies = Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size());
                        if (projectCF == null) {
                            for (PColumnFamily family : tableRef.getTable().getColumnFamilies()) {
                                columnFamilies.add(family.getName().getBytes());
                            }
                        } else {
                            columnFamilies.add(projectCF);
                        }
                        // Need to project all column families into the scan, since we haven&#39;t yet created our empty key value
                        RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY);
                        // Explicitly project these column families and don&#39;t project the empty key value,
                        // since at this point we haven&#39;t added the empty key value everywhere.
                        if (columnFamilies != null) {
                            scan.getFamilyMap().clear();
                            for (byte[] family : columnFamilies) {
                                scan.addFamily(family);
                            }
                            projector = new RowProjector(projector,false);
                        }
                        // Ignore exceptions due to not being able to resolve any view columns,
                        // as this just means the view is invalid. Continue on and try to perform
                        // any other Post DDL operations.
                        try {
                            // Since dropping a VIEW does not affect the underlying data, we do
                            // not need to pass through the view statement here.
                            WhereCompiler.compile(context, select); // Push where clause into scan
                        } catch (ColumnFamilyNotFoundException e) {
                            continue;
                        } catch (ColumnNotFoundException e) {
                            continue;
                        } catch (AmbiguousColumnException e) {
                            continue;
                        }
                        QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
                        try {
                            ResultIterator iterator = plan.iterator();
                            try {
                                Tuple row = iterator.next();
                                ImmutableBytesWritable ptr = context.getTempPtr();
                                totalMutationCount += (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
                            } catch (SQLException e) {
                                sqlE = e;
                            } finally {
                                try {
                                    iterator.close();
                                } catch (SQLException e) {
                                    if (sqlE == null) {
                                        sqlE = e;
                                    } else {
                                        sqlE.setNextException(e);
                                    }
                                } finally {
                                    if (sqlE != null) {
                                        throw sqlE;
                                    }
                                }
                            }
                        } catch (TableNotFoundException e) {
                            // Ignore and continue, as HBase throws when table hasn&#39;t been written to
                            // FIXME: Remove if this is fixed in 0.96
                        }
                    } finally {
                        if (cache != null) { // Remove server cache if there is one
                            cache.close();
                        }
                    }

                }
                final long count = totalMutationCount;

上面就是相关的统计,清理代码

这样一套流程下来,创建 CREATE_TABLE_METADATA SYSTEM_CATALOG表就成功了,
回到 ConnectionQueryServicesImpl.init 方法中,接着创建其它的元数据表

  metaConnection.createStatement().executeUpdate(
QueryConstants.CREATE_STATS_TABLE_METADATA);

   metaConnection.createStatement().executeUpdate(
QueryConstants.CREATE_FUNCTION_METADATA);                   

当上面的sql执行完成后又回到 PhoenixDriver.getConnectionQueryServices

这样一个driver的对象的建立就完成了。
现在理清一下类的调用对象流程图

PhoenixDriver对象继承于 PhoenixEmbeddedDriver调用 createConnection方法

然后调用 ConnectionQueryServicesTestImpl.init(url, info);

然后创建 PhoenixConnection对象

然后创建元数据表 metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);

然后创建 PhoenixStatement对象进行 executeUpdate

然后创建 ExecutableCreateTableStatement 对象 statm 里面创建 CreateTableCompiler

然后创建 MetaDataClient 对象 进行 client.createTable(create, splits, parent, viewStatement, viewType,
viewColumnConstants, isViewColumnReferenced)

然后调用 createTableInternal方法

然后调用 connection.getQueryServices().createTable(
tableMetaData, 方法 进行真正表的创建

然后调用到 ConnectionQueryServicesImpl.createTable对象 再调用ensureTableCreated方法进行物理表的真正创建了

从上面的流程图可以看到,其实phoenix就是对一个sql进行解释,然后调用原生的hbase的api进行表的创建的过程。

相关文章
最新文章
热点推荐