PostgreSQL中的ProcessRepliesIfAny函数分析

本篇内容主要讲解“PostgreSQL中的ProcessRepliesIfAny函数分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“PostgreSQL中的ProcessRepliesIfAny函数分析”吧!

专注于为中小企业提供网站建设、网站制作服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业江安免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了超过千家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。

调用栈如下:

(gdb) bt
#0  0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1  0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0, 
    nevents=1) at latch.c:1048
#2  0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1, 
    wait_event_info=83886092) at latch.c:1000
#3  0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999, 
    wait_event_info=83886092) at latch.c:385
#4  0x000000000085405b in WalSndLoop (send_data=0x8547fe ) at walsender.c:2229
#5  0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684
#6  0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")
    at walsender.c:1539
#7  0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")
    at postgres.c:4178
#8  0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361
#9  0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033
#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706
#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379
#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228

一、数据结构

N/A

二、源码解读

ProcessRepliesIfAny
在streaming期间,处理接收到的消息,同时检查远程终端是否关闭了连接,执行相关处理.
代码不多也不复杂,可自行阅读.

/*
 * Process any incoming messages while streaming. Also checks if the remote
 * end has closed the connection.
 * 在streaming期间,处理接收到的消息.
 * 同时检查远程终端是否关闭了连接,执行相关处理.
 */
static void
ProcessRepliesIfAny(void)
{
    unsigned char firstchar;
    int         r;
    bool        received = false;
    //当前时间
    last_processing = GetCurrentTimestamp();
    for (;;)
    {
        //---------- 循环接收相关消息
        pq_startmsgread();
        r = pq_getbyte_if_available(&firstchar);
        if (r < 0)
        {
            /* unexpected error or EOF */
            //未知异常或者EOF
            ereport(COMMERROR,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
                     errmsg("unexpected EOF on standby connection")));
            //进程退出
            proc_exit(0);
        }
        if (r == 0)
        {
            /* no data available without blocking */
            //已无阻塞的消息数据,退出
            pq_endmsgread();
            break;
        }
        /* Read the message contents */
        //读取消息内容
        resetStringInfo(&reply_message);
        if (pq_getmessage(&reply_message, 0))
        {
            ereport(COMMERROR,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
                     errmsg("unexpected EOF on standby connection")));
            proc_exit(0);
        }
        /*
         * If we already received a CopyDone from the frontend, the frontend
         * should not send us anything until we've closed our end of the COPY.
         * XXX: In theory, the frontend could already send the next command
         * before receiving the CopyDone, but libpq doesn't currently allow
         * that.
         * 如果已在前台接收到CopyDone消息,前台不应该再发送消息,直至关闭COPY.
         * XXX:理论上来说,在接收到CopyDone前,前台可能已经发送了下一个命令,但libpq不允许这种情况发生
         */
        if (streamingDoneReceiving && firstchar != 'X')
            ereport(FATAL,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
                     errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
                            firstchar)));
        /* Handle the very limited subset of commands expected in this phase */
        //处理有限几个命令
        switch (firstchar)
        {
                /*
                 * 'd' means a standby reply wrapped in a CopyData packet.
                 * 'd'意味着standby节点的应答封装了CopyData包
                 */
            case 'd':
                ProcessStandbyMessage();
                received = true;
                break;
                /*
                 * CopyDone means the standby requested to finish streaming.
                 * Reply with CopyDone, if we had not sent that already.
                 * CopyDone意味着standby节点请求结束streaming.
                 * 如尚未发送,则使用CopyDone应答.
                 */
            case 'c':
                if (!streamingDoneSending)
                {
                    pq_putmessage_noblock('c', NULL, 0);
                    streamingDoneSending = true;
                }
                streamingDoneReceiving = true;
                received = true;
                break;
                /*
                 * 'X' means that the standby is closing down the socket.
                 * 'X'意味着standby节点正在关闭socket
                 */
            case 'X':
                proc_exit(0);
            default:
                ereport(FATAL,
                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
                         errmsg("invalid standby message type \"%c\"",
                                firstchar)));
        }
    }
    /*
     * Save the last reply timestamp if we've received at least one reply.
     * 如接收到至少一条应答信息,则保存最后的应答时间戳.
     */
    if (received)
    {
        last_reply_timestamp = last_processing;
        waiting_for_ping_response = false;
    }
}

二、跟踪分析

在主节点上用gdb跟踪postmaster,在PostgresMain上设置断点后启动standby节点,进入断点

(gdb) set follow-fork-mode child
(gdb) b ProcessRepliesIfAny
Breakpoint 2 at 0x85343b: file walsender.c, line 1597.
(gdb) c
Continuing.
Breakpoint 2, ProcessRepliesIfAny () at walsender.c:1597
1597        bool        received = false;
(gdb)

查看进程信息

[xdb@localhost ~]$ ps -ef|grep postgres
xdb       1376     1  0 14:16 ?        00:00:00 /appdb/xdb/pg11.2/bin/postgres
xdb       1377  1376  0 14:16 ?        00:00:00 postgres: logger   
xdb       1550  1376  0 16:53 ?        00:00:00 postgres: checkpointer   
xdb       1551  1376  0 16:53 ?        00:00:00 postgres: background writer   
xdb       1552  1376  0 16:53 ?        00:00:00 postgres: walwriter   
xdb       1553  1376  0 16:53 ?        00:00:00 postgres: autovacuum launcher  
xdb       1554  1376  0 16:53 ?        00:00:00 postgres: archiver   
xdb       1555  1376  0 16:53 ?        00:00:00 postgres: stats collector   
xdb       1556  1376  0 16:53 ?        00:00:00 postgres: logical replication launcher  
xdb       1633  1376  0 17:26 ?        00:00:00 postgres: walsender replicator 192.168.26.26(40528) idle

循环接收相关消息

(gdb) n
1599        last_processing = GetCurrentTimestamp();
(gdb) 
1603            pq_startmsgread();
(gdb) 
1604            r = pq_getbyte_if_available(&firstchar);
(gdb) 
1605            if (r < 0)
(gdb) p r
$1 = 1
(gdb) p firstchar
$2 = 100 'd'
(gdb)

命令是’d’,执行相关处理

(gdb) n
1613            if (r == 0)
(gdb) 
1621            resetStringInfo(&reply_message);
(gdb) 
1622            if (pq_getmessage(&reply_message, 0))
(gdb) 
1637            if (streamingDoneReceiving && firstchar != 'X')
(gdb) 
1644            switch (firstchar)
(gdb) 
1650                    ProcessStandbyMessage();
(gdb) 
1651                    received = true;
(gdb) 
1652                    break;
(gdb) 
1681        }
(gdb)

设置断点

(gdb) b walsender.c:1643
Breakpoint 3 at 0x8535b6: file walsender.c, line 1643.
(gdb) b walsender.c:1672
Breakpoint 4 at 0x85361a: file walsender.c, line 1672.
(gdb) c
Continuing.
Breakpoint 3, ProcessRepliesIfAny () at walsender.c:1644
1644            switch (firstchar)
(gdb) 
Continuing.
...
Breakpoint 4, ProcessRepliesIfAny () at walsender.c:1673
1673                    proc_exit(0);
(gdb)

进程即将退出,查看进程信息

[xdb@localhost ~]$ ps -ef|grep postgres
xdb       1376     1  0 14:16 ?        00:00:00 /appdb/xdb/pg11.2/bin/postgres
xdb       1377  1376  0 14:16 ?        00:00:00 postgres: logger   
xdb       1550  1376  0 16:53 ?        00:00:00 postgres: checkpointer   
xdb       1551  1376  0 16:53 ?        00:00:00 postgres: background writer   
xdb       1552  1376  0 16:53 ?        00:00:00 postgres: walwriter   
xdb       1553  1376  0 16:53 ?        00:00:00 postgres: autovacuum launcher  
xdb       1554  1376  0 16:53 ?        00:00:00 postgres: archiver   
xdb       1555  1376  0 16:53 ?        00:00:00 postgres: stats collector   
xdb       1556  1376  0 16:53 ?        00:00:00 postgres: logical replication launcher  
xdb       1633  1376  0 17:26 ?        00:00:00 postgres: walsender replicator 192.168.26.26(40528) idle
xdb       1637  1376  0 17:27 ?        00:00:00 postgres: walsender replicator 192.168.26.26(40530) streaming 0/5D075248
[xdb@localhost ~]$

进程退出(PID=1633),启动了新的进程(PID=1637)

(gdb) n
[Inferior 2 (process 1633) exited normally]
(gdb)

到此,相信大家对“PostgreSQL中的ProcessRepliesIfAny函数分析”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


当前名称:PostgreSQL中的ProcessRepliesIfAny函数分析
分享路径:http://scyanting.com/article/ihsepp.html