PostgreSQL的存储系统二:REDOLOG文件存储结构二
REDOLOG文件里的用户数据和数据文件里的用户数据存储结构相同 几个月前同事给台湾一家公司培训《pg9 ad admin》时,有个学员提及WAL里记录的内容为Query时的SQL语句(比如insert等),同事告知WAL里记录的tuple信息,而非SQL,该学员坚持里面是SQL或SQL+tuple,并说oracle的redo日志里记录的是SQL(不知到这个从哪里知道的,也许是日志挖掘出来SQL的缘由吧)。便看了一下源码(还是开源的好)。 前面我写过一篇文章《PostgreSQL的存储系统二:REDOLOG文件存储结构》,见地址http://beigang.iteye.com/blog/1565121或http://blog.csdn.net/beiigang/article/details/7680905,其中提到Pg XLOG文件的存储格式大致如下: <PageHeaderData> <XLogRecord> <rmgr-specific data> <BkpBlock> <XLogRecData>里面包括<CheckPoint>等 <BkpBlock> <XLogRecData> <BkpBlock> <XLogRecData> …… 用户相关的数据写在XLogRecData结构(定义见下面)的buffer成员里,但具体写成什么样子没有提及,正好这儿再深入讨论一下。 typedefstruct XLogRecData { char *data; /* 资源管理器包含数据的开始 */ uint32 len; /* 资源管理器包含数据的长度 */ Buffer buffer; /* 有相应数据的buffer,如果有的话 */ bool buffer_std; /* buffer是否有标准pd_lower/pd_upper头 */ struct XLogRecData *next; /* 链里的下一个结构 */ } XLogRecData;
为了说清楚这个问题,跑了个例子如下: INSERT INTO TABLE1(ID,GNAME) VALUES(18,’GangBei’); 看这个例子涉及的调用流程前,先回顾一下pg服务进程的调用流程,一切就绪后进入无限循环,等候客户端指令,
Postgres服务进程调用流程图 这个例子的调用流程和《PostgreSQL服务过程中的那些事二:Pg服务进程处理简单查询》系列博文中的流程大致相同,也是调用exec_simple_query方法,和前面《PostgreSQL服务过程中的那些事二:Pg服务进程处理简单查询》中select例子不同的是,本节中insert的例子在portalrun方法里调用了执行器的ExecInsert方法,最终调用了heap_insert方法,在这个方法里完成了记录写入数据文件,并调用了XLogInsert方法,完成了XLOG的WAL日志写入。更具体的方法调用流程参见下面的调用流程图,其他和《PostgreSQL服务过程中的那些事二:Pg服务进程处理简单查询》基本相同的部分略去。
Insert SQL 语句调用流程 在heap_insert方法里,组装好tuple,调用RelationGetBufferForTuple方法找到shmem里缓存数据文件块的buffer,调用RelationPutHeapTuple方法,把组装好的元组放到合适的buffer中合适的位置;然后组装XLogRecData类型变量rdata,把buffer赋给XLogRecData的成员buffer,接着调用XLogInsert方法,并传入rdata,在XLogInsert方法里,用memcpy方法把rdata写入shmem对应的cache里,最后pg都是通过操作系统接口I/O接口把WAL日志和数据写入对应的文件。 既然XLOG里写的 Insert的wal日志里的用户数据和数据文件中的一样,那我们简单看一下pg中数据文件里的tuple,tuple存放在堆中,一个tuple就是一行表记录,在数据文件的页里存放的结构如下图:
数据文件页面布局图
元组结构图
元组头结构和其字段表示意义见下面: typedefstruct HeapTupleHeaderData { union { HeapTupleFieldst_heap; DatumTupleFieldst_datum; } t_choice;
ItemPointerDatat_ctid; /* current TID of this or newer tuple */
/* Fields below here must match MinimalTupleData! */
uint16 t_infomask2; /* number of attributes + various flags */
uint16 t_infomask; /* various flag bits,see below */
uint8 t_hoff; /* sizeof header incl. bitmap,padding */
/* ^ - 23 bytes - ^ */
bits8 t_bits[1]; /* bitmap of NULLs -- VARIABLE LENGTH */
/* MORE DATA FOLLOWS AT END OF STRUCT */ } HeapTupleHeaderData;
typedefstruct HeapTupleFields { TransactionIdt_xmin; /* inserting xact ID */ TransactionIdt_xmax; /* deleting or locking xact ID */
union { CommandId t_cid; /* inserting or deleting command ID,or both */ TransactionIdt_xvac; /* old-style VACUUM FULL xact ID */ } t_field3; } HeapTupleFields;
typedefstruct DatumTupleFields { int32 datum_len_; /* varlena header (do not touch directly!) */
int32 datum_typmod; /* -1,or identifier of a record type */
Oid datum_typeid; /* composite type OID,or RECORDOID */
/* * Note: field ordering is chosen with thought that Oid might someday * widen to 64 bits. */ } DatumTupleFields;
typedefstruct ItemPointerData { BlockIdDataip_blkid; OffsetNumberip_posid; }
PostgreSQL的元组头结构是MVCC算法的基础。这个以后再说吧。
下面把heap_insert方法和XLogInsert方法贴到了下面,为了突显主题,删掉了其余代码,并把XLOG内容相关变量和方法置为红色,方便串读。
Oid heap_insert(Relation relation,HeapTuple tup,CommandId cid, int options,BulkInsertState bistate) { TransactionId xid = GetCurrentTransactionId(); HeapTuple heaptup; Buffer buffer; bool all_visible_cleared = false;
/*1 组装元组头信息 */ tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; HeapTupleHeaderSetXmin(tup->t_data,xid); HeapTupleHeaderSetCmin(tup->t_data,cid); HeapTupleHeaderSetXmax(tup->t_data,0); tup->t_tableOid = RelationGetRelid(relation);
heaptup = tup;
/*2 Find buffer to insert this tuple into */ buffer = RelationGetBufferForTuple(relation,heaptup->t_len, InvalidBuffer,options,bistate);
/*3 * We're about to do the actual insert -- check for conflict at the * relation or buffer level first,to avoid possibly having to roll back * work we've just done. */ CheckForSerializableConflictIn(relation,NULL,buffer);
/*4 NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); RelationPutHeapTuple(relation,buffer,heaptup) MarkBufferDirty(buffer);
/* XLOG stuff */ if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation)) { xl_heap_insert xlrec; xl_heap_header xlhdr; XLogRecPtr recptr; XLogRecData rdata[3]; Page page = BufferGetPage(buffer); uint8 info = XLOG_HEAP_INSERT;
xlrec.all_visible_cleared = all_visible_cleared; xlrec.target.node = relation->rd_node; xlrec.target.tid = heaptup->t_self; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapInsert; rdata[0].buffer = InvalidBuffer; rdata[0].next = &(rdata[1]);
xlhdr.t_infomask2 = heaptup->t_data->t_infomask2; xlhdr.t_infomask = heaptup->t_data->t_infomask; xlhdr.t_hoff = heaptup->t_data->t_hoff;
/* * note we mark rdata[1] as belonging to buffer; if XLogInsert decides * to write the whole page to the xlog,we don't need to store * xl_heap_header in the xlog. */ rdata[1].data = (char *) &xlhdr; rdata[1].len = SizeOfHeapHeader; rdata[1].buffer = buffer; rdata[1].buffer_std = true; rdata[1].next = &(rdata[2]);
/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData,t_bits); rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData,t_bits); rdata[2].buffer = buffer; rdata[2].buffer_std = true; rdata[2].next = NULL;
recptr = XLogInsert(RM_HEAP_ID,info,rdata);
PageSetLSN(page,recptr); PageSetTLI(page,ThisTimeLineID); }
END_CRIT_SECTION();
UnlockReleaseBuffer(buffer);
pgstat_count_heap_insert(relation);
return HeapTupleGetOid(tup); }
XLogRecPtr XLogInsert(RmgrId rmid,uint8 info,XLogRecData *rdata) { XLogCtlInsert *Insert = &XLogCtl->Insert; XLogRecord *record; XLogContRecord *contrecord; XLogRecPtr RecPtr; XLogRecPtr WriteRqst; uint32 freespace; int curridx; XLogRecData *rdt; Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS]; BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS]; XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS]; pg_crc32 rdata_crc; uint32 len, write_len; unsigned i;
TRACE_POSTGRESQL_XLOG_INSERT(rmid,info);
/* * Here we scan the rdata chain,determine which buffers must be backed * up,and compute the CRC values for the data. */
START_CRIT_SECTION(); /* Now wait to get insert lock */ LWLockAcquire(WALInsertLock,LW_EXCLUSIVE); /* Compute record's XLOG location */ curridx = Insert->curridx; INSERT_RECPTR(RecPtr,Insert,curridx);
/* * Append the data,including backup blocks if any */ /* 把rdata中的数据写入XLOG */ while (write_len) { while (rdata->data == NULL) rdata = rdata->next;
if (freespace > 0) { if (rdata->len > freespace) { memcpy(Insert->currpos,rdata->data,freespace); rdata->data += freespace; rdata->len -= freespace; write_len -= freespace; } else { memcpy(Insert->currpos,rdata->len); freespace -= rdata->len; write_len -= rdata->len; Insert->currpos += rdata->len; rdata = rdata->next; continue; } }
/* Use next buffer */ updrqst = AdvanceXLInsertBuffer(false); curridx = Insert->curridx; /* Insert cont-record header */ Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD; contrecord = (XLogContRecord *) Insert->currpos; contrecord->xl_rem_len = write_len; Insert->currpos += SizeOfXLogContRecord; freespace = INSERT_FREESPACE(Insert); }
LWLockRelease(WALInsertLock); XactLastRecEnd = RecPtr; END_CRIT_SECTION(); return RecPtr; }
下面这个图是WAL日志中存放的有关的INSERT、UPDATE、DELETE操作的内容,该图引自《Internals Of PostgreSQL Wal》
就到这儿吧。 ----------------- 转载请著明出处: blog.csdn.net/beiigang beigang.iteye.com (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |