转载:Postgresql-xl 调研
转载来自:http://www.cnblogs.com/yyvv/p/4188980.html 来历这个项目的背后是一家叫做stormDB的公司。整个代买基于postgres-xc。开源版本应该是stormdb的一个分支。
个人观感纯个人理解,不代表是正确的,如果理解有偏差,抱歉
分布式架构Postgresql-xl的官方主页在。注意这个网站引用的googleapi的某些资源,所以有时候比较慢。注意OLAP是排在OLTP的前面。 Features Fully ACID Workloads: OLAP with MPP Parallelism 首先请仔细读官方overview,这篇review中概要地描述了整个系统的大概的状况。注意这个架构中dataNode和coordinators都可以部署多个,GTM(global Transcation Manager)只有一个,图中画了两个的原因是有一个是standby。 和Postgresql-xc的关系这个问题官方的答案是
实际上在Postgresql-xl的src中包含的一个文件夹就叫pgxc。由于代码是基于pgxc的,所以大量的注释和代码都是pgxc的。 xl和xc最大的不同在于:xc的逻辑是如果SQL可以下推到datanode上做,那么就下推,否则把所有数据读到coordinator上面统一做。而xl则是真正意义上MPP。 代码改动方法和实现 相对于postgresql来说,在pgxl的基本逻辑是尽量少的修改代码,某些核心组件必须要做出调整,但是大部分保持一致,新增的文件都放在新的位置。 #ifdef PGXC (PG-xc的改动)
#ifndef XCP(PG-xl基于xc的改动)
....
#endif
#endif
GTM
从代码(src/gtm)上看,这部分主要功能就是提供global的事务管理,给出global_txn_id和timestamp等等,考虑到这是一个单点,standby的相关代码也在这一部分。 snapshot/*
* Get snapshot for the given transactions. If this is first call in the
* transaction,a fresh snapshot is taken and returned back. For a serializable
* to the function will return the same snapshot.
* For a read-committed every time and
* returned the caller.
*
* The returned snapshot includes xmin (lowest still-running xact ID),* xmax (highest completed xact ID + 1),and a list of running xact IDs
* the range xmin <= xid < xmax. It is used as follows:
* All xact IDs < xmin are considered finished.
* All xact IDs >= xmax are considered still running.
* For an xact ID xmin <= xid < xmax,consult list to see whether
* it is considered running or not.
* This ensures that set of transactions seen as "running" by the
* current xact will not change after it takes the snapshot.
*
* All running top-level XIDs are included the snapshot.
*
* We also update the following global variables:
* RecentGlobalXmin: global xmin (oldest TransactionXmin across all
* running transactions
*
* Note: this function should probably not be called with an argument that's
* not statically allocated (see xip allocation below).
*/
GTM_Snapshot
GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[],int txn_count,int *status)
snapshot的重要功能就是,在各个节点之间同步事务的提交状态。 GTM-ProxyBecause GTM has to take care of each transaction,it has to read and write enormous amount of messages which may restrict Postgres-XL scalability. GTM-Proxy is a proxy of GTM feature which groups requests and response to reduce network read/write by GTM. Distributing one snapshot to multiple transactions also contributes to reduce GTM network workload. Coordinator
代码基本在 src/backend/pgxc。 DatanodeThe Datanode is very close to PostgreSQL itself because it just handles incoming statements locally. The Coordinator and Datanode shares the same binary but their behavior is a little different. The Coordinator decomposes incoming statements into those handled by Datanodes. If necessary,the Coordinator materializes response from Datanodes to calculate final response to applications. 实现查询优化的方法DDL的处理增加了一个叫做STORM_CATALOG_NAMESPACE的namespace,新增的系统表什么的都在这个namespace里。对于DDL语句来说,基本上就是发命令转发到所有的DataNode和Coodinator上面去,具体的代码,具体逻辑参见函数 /*
* Execute utility statement on multiple Datanodes
* It does approximately the same as
* RemoteQueryState *state = ExecInitRemoteQuery(plan,estate,flags);
* Assert(TupIsNull(ExecRemoteQuery(state));
* ExecEndRemoteQuery(state)
* But does not need an Estate instance and does not do some unnecessary work,255)">* like allocating tuple slots.
*/
void
ExecRemoteUtility(RemoteQuery *node)
directStmt
所谓directStmt是这样的stmt:
Synopsis EXECUTE DIRECT ON ( nodename [,... ] ) query Examples Select some data in a given table tenk1 on remote Datanode named dn1: EXECUTE DIRECT ON NODE dn1 'SELECT * FROM tenk1 WHERE col_char = ''foo'''; Select local timestamp of a remote node named coord2: EXECUTE DIRECT ON coord2 'select clock_timestamp()'; Select list of tables of a remote node named dn50: EXECUTE DIRECT ON dn50 'select tablename from pg_tables';
详细说明见:http://files.postgres-xl.org/documentation/sql-executedirect.html
实际上在standard_planner(Query *parse,int cursorOptions,ParamListInfo boundParams)中对directStmt是这么判断的:
/* * pgxc_direct_planner * The routine tries to see if the statement can be completely evaluated on the * datanodes. In such cases coordinator is not needed to evaluate the statement,* and just acts as a proxy. A statement can be completely shipped to the remote * node if every row of the result can be evaluated on a single datanode. * For example: * * Only EXECUTE DIRECT statements are sent directly as of now */ #ifdef XCP if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && parse->utilityStmt && IsA(parse->utilityStmt,RemoteQuery)) return pgxc_direct_planner(parse,cursorOptions,boundParams); #endif 简单说就是直接发SQL发到指定的节点。 非directStmt对于不是directStmt的查询来说,情况会变得比较复杂,可能每个节点只完成一部分的运算,而这次预算的结果又是另外一个或多个节点运算的输入。
Parse/resolve 在这个阶段,最重要的事情发生在 生成plan 下面的函数会生成一个RemoteSubPlan,它和PG原有的plan是一样的,只不过它在执行的时候是读取从网络(而不是内存或磁盘)得到的中间结果。在makeplan的时候,需要计算源分布,结果分布和排序。 * make_remotesubplan
* Create a RemoteSubplan node to execute subplan on remote nodes.
* leftree - the subplan which we want to push down to remote node.
* resultDistribution - the distribution of the remote result. May be NULL -
* results are coming to the invoking node
* execDistribution - determines how source data of the subplan are
* distributed,where we should send the subplan and how combine results.
* pathkeys - the remote subplan is sorted according to these keys,executor
* should perform merge sort of incoming tuples
*/
RemoteSubplan *
make_remotesubplan(PlannerInfo *root,Plan *lefttree,Distribution *resultDistribution,255)">*execDistribution,List *pathkeys)
重分布数据 * Set a RemoteSubPath on top of the specified node and set specified
* distribution to it
*/
static Path *
redistribute_path(Path *subpath,char distributionType,Bitmapset *nodes,255)">*restrictNodes,Node* distributionExpr)
SCAN* set_scanpath_distribution
* Assign distribution to the path which is a base relation scan.
*/
static void
set_scanpath_distribution(PlannerInfo *rel,Path *pathnode)
JOIN 在 /*
* Analyze join parameters and set distribution of the join node.
* If there are possible alternate distributions the respective pathes are
* returned as a list so caller can cost all of them and choose cheapest to
* continue.
*/
static List *
set_joinpath_distribution(PlannerInfo *root,JoinPath *pathnode)
下面这些情况是可以不重分布数据搞定的:
/*
* If both subpaths are distributed by replication,the resulting
* distribution will be replicated on smallest common of nodes.
* Catalog tables are the same on all nodes,so treat them as replicated
* on all nodes.
*/
/*
* Check if we have inner replicated
* The "both replicated" case is already checked,so if innerd
* is replicated,255)">then outerd not replicated and it not NULL.
* This not acceptable for some join types. If outer relation is
* nullable data nodes will produce joined rows with NULLs for cases when
* matching row exists,but on other data node.
*/
/*
* Check if we have outer replicated
* The if outerd
* then innerd If inner relation on other data node.
*/
/*
* This join is still allowed if inner and outer paths have
* equivalent distribution and joined along the distribution keys.
*/
/*
* Build cartesian product,255)">if no hasheable restrictions is found.
* Perform coordinator in such cases. If this join would be a part of
* larger join,it will be handled as replicated.
* To do that leave join distribution NULL and place a RemoteSubPath node on
* top of each subpath to provide access to joined result sets.
* Do not redistribute pathes that already have NULL distribution,this is
* possible if performing outer on a coordinator and a datanode
* relations.
*/
GROUP BY和join,group by的做法类似 /*
* Grouping preserves distribution if distribution key is the
* first grouping key if distribution is replicated.
* In these cases aggregation is fully pushed down to nodes.
* Otherwise we need 2-phase aggregation so put remote subplan
* on top of the result_plan. When adding result agg on top of
* RemoteSubplan first aggregation phase will be pushed down
* automatically.
*/
static Plan *
grouping_distribution(PlannerInfo *root,Plan *plan,int numGroupCols,AttrNumber *groupColIdx,List *current_pathkeys,Distribution **distribution)
实现查询执行的方法ExecutorStart#ifdef PGXC
case T_RemoteQuery:
result = (PlanState *) ExecInitRemoteQuery((RemoteQuery *) node,eflags);
break;
#endif
#ifdef XCP
case T_RemoteSubplan:
result = (PlanState *) ExecInitRemoteSubplan((RemoteSubplan *) node,255)">break;
#endif /* XCP */
ExecutorRun主要是通过,下面两个函数来完成的,分别处理query和subplan。 case T_RemoteQueryState:
result = ExecRemoteQuery((RemoteQueryState *) node);
break;
#endif
#ifdef XCP
case T_RemoteSubplanState:
result = ExecRemoteSubplan((RemoteSubplanState *) node);
break;
#endif /* XCP */
数据交换pgxl的数据交换方式是通过这样的方式完成:
上述ShareQueue每个dataNode上各有一个。 数据分布方式*----------
* DistributionType - how to distribute the data
*/
typedef enum DistributionType
{
DISTTYPE_REPLICATION,/* Replicated */
DISTTYPE_HASH,255)">* Hash partitioned */
DISTTYPE_ROUNDROBIN,255)">* Round Robin */
DISTTYPE_MODULO /* Modulo partitioned */
} DistributionType;
事务处理算法就是两阶段提交,显然并不是所有的操作都需要两阶段提交的。触发两阶段提交的条件 /*
* Returns true if 2PC is required for consistent commit: if there was write
* activity on two or more nodes within current transaction.
*/
bool
IsTwoPhaseCommitRequired(bool localWrite)
关于两阶段提交,这里不赘述,下面这段注释来自postgresql-xc。不过xl应该也是类似的逻辑,相关的代码,一方面在backend/pgxc/pool里,另一方面在PG正常的事务处理中。 /*
* Do pre-commit processing for remote nodes which includes Datanodes and
* Coordinators. If more than one nodes are involved transaction write
* activity,255)">then we must run 2PC. For 2PC,we do the following steps:
*
* 1. PREPARE transaction locally if local node is involved the
* transaction. If not involved,skip this step and go the
* next step
* 2. PREPARE transaction on all the remote nodes. If any node fails to
* PREPARE,directly go to step 6
* 3. Now that all the involved nodes are PREPAREd,we can commit transaction. We first inform the GTM transaction is fully
* PREPARED and also supply the list the nodes involved transaction
* 4. COMMIT PREPARED the remotes nodes then
* finally COMMIT PREPARED on its involved and start a new transaction so that normal commit processing
* works unchanged. Go to step 5.
* 5. Return and let the normal commit processing resume
* 6. Abort by ereporting error and let normal abort-processing take
* charge.
*/
杂项barrierbarrier CREATE BARRIER Name CREATE BARRIER -- create a new barrier Synopsis CREATE BARRIER barrier_name Description Note: The following description applies only to Postgres-XL CREATE BARRIER is new SQL command specific to Postgres-XL that creates a new XLOG record on each node of the cluster consistently. Essentially a barrier is a consistent point in the cluster that you can recover to. Note that these are currently created manually,255)">not autoatically. Without barriers,255)">if you recover an individual component,it may be possible that it not consistent with the other nodes depending when it was committed. A barrier is created via a 2PC-like mechanism from a remote Coordinator in 3 phases with a prepare,255)">execute and ending phases. A new recovery parameter called recovery_target_barrier has been added in recovery.conf. In order to perform a complete PITR recovery,it is necessary set recovery_target_barrier to the value of a barrier already created. Then distribute recovery.conf each data folder each node,255)">then to restart the nodes one by one. The default barrier name is dummy_barrier_id. It when no barrier name is specified using CREATE BARRIER.
pauseunpausepause
unpause
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |