HiveSQL源码解析

作者: melanc 分类: 分布式 发布时间: 2019-06-05 17:14

1.SQL Parser (HQL -> AST)

  /**
   * Parses a command, optionally assigning the parser's token stream to the
   * given context.
   *
   * @param command
   *          command to parse
   *
   * @param ctx
   *          context with which to associate this parser's token stream, or
   *          null if either no context is available or the context already has
   *          an existing stream
   *
   * @return parsed AST
   */
  public ASTNode parse(String command, Context ctx, boolean setTokenRewriteStream)
      throws ParseException {
//    if (LOG.isDebugEnabled()) {
//      LOG.debug("Parsing command: " + command);
//    }

    HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command));
    TokenRewriteStream tokens = new TokenRewriteStream(lexer);
    if (ctx != null) {
      if ( setTokenRewriteStream) {
        ctx.setTokenRewriteStream(tokens);
      }
      lexer.setHiveConf(ctx.getConf());
    }
    HiveParser parser = new HiveParser(tokens);
    if (ctx != null) {
      parser.setHiveConf(ctx.getConf());
    }
    parser.setTreeAdaptor(adaptor);
    HiveParser.statement_return r = null;
    try {
      r = parser.statement();
    } catch (RecognitionException e) {
      e.printStackTrace();
      throw new ParseException(parser.errors);
    }

    if (lexer.getErrors().size() == 0 && parser.errors.size() == 0) {
//      LOG.debug("Parse Completed");
    } else if (lexer.getErrors().size() != 0) {
      throw new ParseException(lexer.getErrors());
    } else {
      throw new ParseException(parser.errors);
    }

    ASTNode tree = (ASTNode) r.getTree();
    tree.setUnknownTokenBoundaries();
    return tree;
  }

2.Semantic Analyzer (AST -> QB)

  void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticException {
    // 1. Generate Resolved Parse tree from syntax tree
    LOG.info("Starting Semantic Analysis");
    if (!genResolvedParseTree(ast, plannerCtx)) {
      return;
    }

    // 2. Gen OP Tree from resolved Parse Tree
    Operator sinkOp = genOPTree(ast, plannerCtx);

    if (!unparseTranslator.isEnabled() && tableMask.isEnabled()) {
      // Here we rewrite the * and also the masking table
      ASTNode tree = rewriteASTWithMaskAndFilter(ast);
      if (tree != ast) {
        ctx.setSkipTableMasking(true);
        init(true);
        genResolvedParseTree(tree, plannerCtx);
        if (this instanceof CalcitePlanner) {
          ((CalcitePlanner) this).resetCalciteConfiguration();
        }
        sinkOp = genOPTree(tree, plannerCtx);
      }
    }

    // 3. Deduce Resultset Schema
    if (createVwDesc != null) {
      resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver());
    } else {
      // resultSchema will be null if
      // (1) cbo is disabled;
      // (2) or cbo is enabled with AST return path (whether succeeded or not,
      // resultSchema will be re-initialized)
      // It will only be not null if cbo is enabled with new return path and it
      // succeeds.
      if (resultSchema == null) {
        resultSchema = convertRowSchemaToResultSetSchema(opParseCtx.get(sinkOp).getRowResolver(),
            HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES));
      }
    }

    // 4. Generate Parse Context for Optimizer & Physical compiler
    copyInfoToQueryProperties(queryProperties);
    ParseContext pCtx = new ParseContext(queryState, opToPartPruner, opToPartList, topOps,
        new HashSet<JoinOperator>(joinContext.keySet()),
        new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
        loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx,
        listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner,
        globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,
        viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,
        analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema, acidFileSinks);

    // 5. Take care of view creation
    if (createVwDesc != null) {
      saveViewDefinition();

      // validate the create view statement at this point, the createVwDesc gets
      // all the information for semanticcheck
      validateCreateView(createVwDesc);

      // Since we're only creating a view (not executing it), we don't need to
      // optimize or translate the plan (and in fact, those procedures can
      // interfere with the view creation). So skip the rest of this method.
      ctx.setResDir(null);
      ctx.setResFile(null);

      try {
        PlanUtils.addInputsForView(pCtx);
      } catch (HiveException e) {
        throw new SemanticException(e);
      }

      // Generate lineage info for create view statements
      // if LineageLogger hook is configured.
      if (HiveConf.getVar(conf, HiveConf.ConfVars.POSTEXECHOOKS).contains(
          "org.apache.hadoop.hive.ql.hooks.LineageLogger")) {
        ArrayList<Transform> transformations = new ArrayList<Transform>();
        transformations.add(new HiveOpConverterPostProc());
        transformations.add(new Generator());
        for (Transform t : transformations) {
          pCtx = t.transform(pCtx);
        }
      }
      return;
    }

    // 6. Generate table access stats if required
    if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS)) {
      TableAccessAnalyzer tableAccessAnalyzer = new TableAccessAnalyzer(pCtx);
      setTableAccessInfo(tableAccessAnalyzer.analyzeTableAccess());
    }

    // 7. Perform Logical optimization
    if (LOG.isDebugEnabled()) {
      LOG.debug("Before logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
    }
    Optimizer optm = new Optimizer();
    optm.setPctx(pCtx);
    optm.initialize(conf);
    pCtx = optm.optimize();
    if (pCtx.getColumnAccessInfo() != null) {
      // set ColumnAccessInfo for view column authorization
      setColumnAccessInfo(pCtx.getColumnAccessInfo());
    }
    FetchTask origFetchTask = pCtx.getFetchTask();
    if (LOG.isDebugEnabled()) {
      LOG.debug("After logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
    }

    // 8. Generate column access stats if required - wait until column pruning
    // takes place during optimization
    boolean isColumnInfoNeedForAuth = SessionState.get().isAuthorizationModeV2()
        && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED);
    if (isColumnInfoNeedForAuth
        || HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {
      ColumnAccessAnalyzer columnAccessAnalyzer = new ColumnAccessAnalyzer(pCtx);
      // view column access info is carried by this.getColumnAccessInfo().
      setColumnAccessInfo(columnAccessAnalyzer.analyzeColumnAccess(this.getColumnAccessInfo()));
    }

    // 9. Optimize Physical op tree & Translate to target execution engine (MR,
    // TEZ..)
    if (!ctx.getExplainLogical()) {
      TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx);
      compiler.init(queryState, console, db);
      compiler.compile(pCtx, rootTasks, inputs, outputs);
      fetchTask = pCtx.getFetchTask();
    }
    LOG.info("Completed plan generation");

    // 10. put accessed columns to readEntity
    if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {
      putAccessedColumnsToReadEntity(inputs, columnAccessInfo);
    }

    // 11. if desired check we're not going over partition scan limits
    if (!ctx.getExplain()) {
      enforceScanLimits(pCtx, origFetchTask);
    }

    return;
  }

3.Logical Plan Gen (QB -> OP Tree)

  public Operator genPlan(QB qb, boolean skipAmbiguityCheck)
      throws SemanticException {

    // First generate all the opInfos for the elements in the from clause
    // Must be deterministic order map - see HIVE-8707
    Map<String, Operator> aliasToOpInfo = new LinkedHashMap<String, Operator>();

    // Recurse over the subqueries to fill the subquery part of the plan
    for (String alias : qb.getSubqAliases()) {
      QBExpr qbexpr = qb.getSubqForAlias(alias);
      Operator operator = genPlan(qb, qbexpr);
      aliasToOpInfo.put(alias, operator);
      if (qb.getViewToTabSchema().containsKey(alias)) {
        // we set viewProjectToTableSchema so that we can leverage ColumnPruner.
        if (operator instanceof SelectOperator) {
          if (this.viewProjectToTableSchema == null) {
            this.viewProjectToTableSchema = new LinkedHashMap<>();
          }
          viewProjectToTableSchema.put((SelectOperator) operator, qb.getViewToTabSchema()
              .get(alias));
        } else {
          throw new SemanticException("View " + alias + " is corresponding to "
              + operator.getType().name() + ", rather than a SelectOperator.");
        }
      }
    }

    // Recurse over all the source tables
    for (String alias : qb.getTabAliases()) {
      Operator op = genTablePlan(alias, qb);
      aliasToOpInfo.put(alias, op);
    }

    if (aliasToOpInfo.isEmpty()) {
      qb.getMetaData().setSrcForAlias(DUMMY_TABLE, getDummyTable());
      TableScanOperator op = (TableScanOperator) genTablePlan(DUMMY_TABLE, qb);
      op.getConf().setRowLimit(1);
      qb.addAlias(DUMMY_TABLE);
      qb.setTabAlias(DUMMY_TABLE, DUMMY_TABLE);
      aliasToOpInfo.put(DUMMY_TABLE, op);
    }

    Operator srcOpInfo = null;
    Operator lastPTFOp = null;

    if(queryProperties.hasPTF()){
      //After processing subqueries and source tables, process
      // partitioned table functions

      HashMap<ASTNode, PTFInvocationSpec> ptfNodeToSpec = qb.getPTFNodeToSpec();
      if ( ptfNodeToSpec != null ) {
        for(Entry<ASTNode, PTFInvocationSpec> entry : ptfNodeToSpec.entrySet()) {
          ASTNode ast = entry.getKey();
          PTFInvocationSpec spec = entry.getValue();
          String inputAlias = spec.getQueryInputName();
          Operator inOp = aliasToOpInfo.get(inputAlias);
          if ( inOp == null ) {
            throw new SemanticException(generateErrorMessage(ast,
                "Cannot resolve input Operator for PTF invocation"));
          }
          lastPTFOp = genPTFPlan(spec, inOp);
          String ptfAlias = spec.getFunction().getAlias();
          if ( ptfAlias != null ) {
            aliasToOpInfo.put(ptfAlias, lastPTFOp);
          }
        }
      }

    }

    // For all the source tables that have a lateral view, attach the
    // appropriate operators to the TS
    genLateralViewPlans(aliasToOpInfo, qb);


    // process join
    if (qb.getParseInfo().getJoinExpr() != null) {
      ASTNode joinExpr = qb.getParseInfo().getJoinExpr();

      if (joinExpr.getToken().getType() == HiveParser.TOK_UNIQUEJOIN) {
        QBJoinTree joinTree = genUniqueJoinTree(qb, joinExpr, aliasToOpInfo);
        qb.setQbJoinTree(joinTree);
      } else {
        QBJoinTree joinTree = genJoinTree(qb, joinExpr, aliasToOpInfo);
        qb.setQbJoinTree(joinTree);
        /*
         * if there is only one destination in Query try to push where predicates
         * as Join conditions
         */
        Set<String> dests = qb.getParseInfo().getClauseNames();
        if ( dests.size() == 1 && joinTree.getNoOuterJoin()) {
          String dest = dests.iterator().next();
          ASTNode whereClause = qb.getParseInfo().getWhrForClause(dest);
          if ( whereClause != null ) {
            extractJoinCondsFromWhereClause(joinTree, qb, dest,
                (ASTNode) whereClause.getChild(0),
                aliasToOpInfo );
          }
        }

        if (!disableJoinMerge) {
          mergeJoinTree(qb);
        }
      }

      // if any filters are present in the join tree, push them on top of the
      // table
      pushJoinFilters(qb, qb.getQbJoinTree(), aliasToOpInfo);
      srcOpInfo = genJoinPlan(qb, aliasToOpInfo);
    } else {
      // Now if there are more than 1 sources then we have a join case
      // later we can extend this to the union all case as well
      srcOpInfo = aliasToOpInfo.values().iterator().next();
      // with ptfs, there maybe more (note for PTFChains:
      // 1 ptf invocation may entail multiple PTF operators)
      srcOpInfo = lastPTFOp != null ? lastPTFOp : srcOpInfo;
    }

    Operator bodyOpInfo = genBodyPlan(qb, srcOpInfo, aliasToOpInfo);

    if (LOG.isDebugEnabled()) {
      LOG.debug("Created Plan for Query Block " + qb.getId());
    }

    if (qb.getAlias() != null) {
      rewriteRRForSubQ(qb.getAlias(), bodyOpInfo, skipAmbiguityCheck);
    }

    setQB(qb);
    return bodyOpInfo;
  }

  private Operator genBodyPlan(QB qb, Operator input, Map<String, Operator> aliasToOpInfo)
      throws SemanticException {
    QBParseInfo qbp = qb.getParseInfo();

    TreeSet<String> ks = new TreeSet<String>(qbp.getClauseNames());
    Map<String, Operator<? extends OperatorDesc>> inputs = createInputForDests(qb, input, ks);

    Operator curr = input;

    List<List<String>> commonGroupByDestGroups = null;

    // If we can put multiple group bys in a single reducer, determine suitable groups of
    // expressions, otherwise treat all the expressions as a single group
    if (conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEREDUCER)) {
      try {
        commonGroupByDestGroups = getCommonGroupByDestGroups(qb, inputs);
      } catch (SemanticException e) {
        LOG.error("Failed to group clauses by common spray keys.", e);
      }
    }

    if (commonGroupByDestGroups == null) {
      commonGroupByDestGroups = new ArrayList<List<String>>();
      commonGroupByDestGroups.add(new ArrayList<String>(ks));
    }

    if (!commonGroupByDestGroups.isEmpty()) {

      // Iterate over each group of subqueries with the same group by/distinct keys
      for (List<String> commonGroupByDestGroup : commonGroupByDestGroups) {
        if (commonGroupByDestGroup.isEmpty()) {
          continue;
        }

        String firstDest = commonGroupByDestGroup.get(0);
        input = inputs.get(firstDest);

        // Constructs a standard group by plan if:
        // There is no other subquery with the same group by/distinct keys or
        // (There are no aggregations in a representative query for the group and
        // There is no group by in that representative query) or
        // The data is skewed or
        // The conf variable used to control combining group bys into a single reducer is false
        if (commonGroupByDestGroup.size() == 1 ||
            (qbp.getAggregationExprsForClause(firstDest).size() == 0 &&
            getGroupByForClause(qbp, firstDest).size() == 0) ||
            conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW) ||
            !conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEREDUCER)) {

          // Go over all the destination tables
          for (String dest : commonGroupByDestGroup) {
            curr = inputs.get(dest);

            if (qbp.getWhrForClause(dest) != null) {
              ASTNode whereExpr = qb.getParseInfo().getWhrForClause(dest);
              curr = genFilterPlan((ASTNode) whereExpr.getChild(0), qb, curr, aliasToOpInfo, false, false);
            }
            // Preserve operator before the GBY - we'll use it to resolve '*'
            Operator<?> gbySource = curr;

            if ((qbp.getAggregationExprsForClause(dest).size() != 0
                || getGroupByForClause(qbp, dest).size() > 0)
                    && (qbp.getSelForClause(dest).getToken().getType() != HiveParser.TOK_SELECTDI
                      || qbp.getWindowingExprsForClause(dest) == null)) {
              // multiple distincts is not supported with skew in data
              if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
                  qbp.getDistinctFuncExprsForClause(dest).size() > 1) {
                throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.
                    getMsg());
              }
              // insert a select operator here used by the ColumnPruner to reduce
              // the data to shuffle
              curr = genSelectAllDesc(curr);
              // Check and transform group by *. This will only happen for select distinct *.
              // Here the "genSelectPlan" is being leveraged.
              // The main benefits are (1) remove virtual columns that should
              // not be included in the group by; (2) add the fully qualified column names to unParseTranslator
              // so that view is supported. The drawback is that an additional SEL op is added. If it is
              // not necessary, it will be removed by NonBlockingOpDeDupProc Optimizer because it will match
              // SEL%SEL% rule.
              ASTNode selExprList = qbp.getSelForClause(dest);
              if (selExprList.getToken().getType() == HiveParser.TOK_SELECTDI
                  && selExprList.getChildCount() == 1 && selExprList.getChild(0).getChildCount() == 1) {
                ASTNode node = (ASTNode) selExprList.getChild(0).getChild(0);
                if (node.getToken().getType() == HiveParser.TOK_ALLCOLREF) {
                  curr = genSelectPlan(dest, qb, curr, curr);
                  RowResolver rr = opParseCtx.get(curr).getRowResolver();
                  qbp.setSelExprForClause(dest, SemanticAnalyzer.genSelectDIAST(rr));
                }
              }
              if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
                if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
                  curr = genGroupByPlanMapAggrNoSkew(dest, qb, curr);
                } else {
                  curr = genGroupByPlanMapAggr2MR(dest, qb, curr);
                }
              } else if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
                curr = genGroupByPlan2MR(dest, qb, curr);
              } else {
                curr = genGroupByPlan1MR(dest, qb, curr);
              }
            }
            if (LOG.isDebugEnabled()) {
              LOG.debug("RR before GB " + opParseCtx.get(gbySource).getRowResolver()
                  + " after GB " + opParseCtx.get(curr).getRowResolver());
            }

            curr = genPostGroupByBodyPlan(curr, dest, qb, aliasToOpInfo, gbySource);
          }
        } else {
          curr = genGroupByPlan1ReduceMultiGBY(commonGroupByDestGroup, qb, input, aliasToOpInfo);
        }
      }
    }


    if (LOG.isDebugEnabled()) {
      LOG.debug("Created Body Plan for Query Block " + qb.getId());
    }

    return curr;
  }

4.Logical Optmizer (Rewrite OP Tree)

  /**
   * Invoke all the transformations one-by-one, and alter the query plan.
   *
   * @return ParseContext
   * @throws SemanticException
   */
  public ParseContext optimize() throws SemanticException {
    for (Transform t : transformations) {
      t.beginPerfLogging();
      pctx = t.transform(pctx);
      t.endPerfLogging(t.toString());
    }
    return pctx;
  }

5.Physical Plan Gen (OP Tree -> Task Tree)

  protected void generateTaskTree(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx,
      List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {

    // generate map reduce plans
    ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
    GenMRProcContext procCtx = new GenMRProcContext(
        conf,
        // Must be deterministic order map for consistent q-test output across Java versions
        new LinkedHashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>>(),
        tempParseContext, mvTask, rootTasks,
        new LinkedHashMap<Operator<? extends OperatorDesc>, GenMapRedCtx>(),
        inputs, outputs);

    // create a walker which walks the tree in a DFS manner while maintaining
    // the operator stack.
    // The dispatcher generates the plan from the operator tree
    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
    opRules.put(new RuleRegExp(new String("R1"),
        TableScanOperator.getOperatorName() + "%"),
        new GenMRTableScan1());
    opRules.put(new RuleRegExp(new String("R2"),
        TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
        new GenMRRedSink1());
    opRules.put(new RuleRegExp(new String("R3"),
        ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
        new GenMRRedSink2());
    opRules.put(new RuleRegExp(new String("R4"),
        FileSinkOperator.getOperatorName() + "%"),
        new GenMRFileSink1());
    opRules.put(new RuleRegExp(new String("R5"),
        UnionOperator.getOperatorName() + "%"),
        new GenMRUnion1());
    opRules.put(new RuleRegExp(new String("R6"),
        UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
        new GenMRRedSink3());
    opRules.put(new RuleRegExp(new String("R7"),
        MapJoinOperator.getOperatorName() + "%"),
        MapJoinFactory.getTableScanMapJoin());

    // The dispatcher fires the processor corresponding to the closest matching
    // rule and passes the context along
    Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules,
        procCtx);

    GraphWalker ogw = new GenMapRedWalker(disp);
    ArrayList<Node> topNodes = new ArrayList<Node>();
    topNodes.addAll(pCtx.getTopOps().values());
    ogw.startWalking(topNodes, null);
  }
}

6.Physical Optimizer (Optimize Task Tree)

  protected void optimizeTaskPlan(List<Task<? extends Serializable>> rootTasks,
      ParseContext pCtx, Context ctx) throws SemanticException {
    // reduce sink does not have any kids - since the plan by now has been
    // broken up into multiple
    // tasks, iterate over all tasks.
    // For each task, go over all operators recursively
    for (Task<? extends Serializable> rootTask : rootTasks) {
      breakTaskTree(rootTask);
    }


    PhysicalContext physicalContext = new PhysicalContext(conf,
        getParseContext(pCtx, rootTasks), ctx, rootTasks, pCtx.getFetchTask());
    PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
        physicalContext, conf);
    physicalOptimizer.optimize();

  }

 

 

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

电子邮件地址不会被公开。 必填项已用*标注

标签云