PostgreSQL源码解读(155)-后台进程#7(walsender#3)

本节继续介绍PostgreSQL的后台进程walsender,重点介绍的是调用栈中的函数WalSndLoop->WaitLatchOrSocket->WaitEventSetWait->WaitEventSetWaitBlock.
调用栈如下:


(gdb) bt
#0  0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1  0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0, 
    nevents=1) at latch.c:1048
#2  0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1, 
    wait_event_info=83886092) at latch.c:1000
#3  0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999, 
    wait_event_info=83886092) at latch.c:385
#4  0x000000000085405b in WalSndLoop (send_data=0x8547fe ) at walsender.c:2229
#5  0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684
#6  0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")
    at walsender.c:1539
#7  0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")
    at postgres.c:4178
#8  0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361
#9  0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033
#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706
#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379
#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228

一、数据结构

WaitEvent
WaitEvent等待事件结构体


typedef struct WaitEvent
{
    //在event数据结构体中的位置
    int         pos;            /* position in the event data structure */
    //已触发的事件
    uint32      events;         /* triggered events */
    //与该事件相关的socket fd(文件描述符)
    pgsocket    fd;             /* socket fd associated with event */
    //在AddWaitEventToSet中提供的指针
    void       *user_data;      /* pointer provided in AddWaitEventToSet */
#ifdef WIN32
    //WIN32 是否已重置?
    bool        reset;          /* Is reset of the event required? */
#endif
} WaitEvent;

WaitEventSet
WaitEventSet等待事件集


/* typedef in latch.h */
//latch.h中定义的类型
struct WaitEventSet
{
    //注册的事件数
    int         nevents;        /* number of registered events */
    //该集合中最大的事件数
    int         nevents_space;  /* maximum number of events in this set */
    /*
     * Array, of nevents_space length, storing the definition of events this
     * set is waiting for.
     * 数组,长度为nevents_space,存储该集合中等待的事件定义
     */
    WaitEvent  *events;
    /*
     * If WL_LATCH_SET is specified in any wait event, latch is a pointer to
     * said latch, and latch_pos the offset in the ->events array. This is
     * useful because we check the state of the latch before performing doing
     * syscalls related to waiting.
     * 如在等待事件中指定了WL_LATCH_SET,latch指向所述的latch,
     *   latch_pos是events数组中的偏移.
     * 这很有用因为在执行与等待相关syscalls前可以检查latch的状态.
     */
    Latch      *latch;
    int         latch_pos;
    /*
     * WL_EXIT_ON_PM_DEATH is converted to WL_POSTMASTER_DEATH, but this flag
     * is set so that we'll exit immediately if postmaster death is detected,
     * instead of returning.
     * WL_EXIT_ON_PM_DEATH转换为WL_POSTMASTER_DEATH,
     *   但该标志设置以便一旦检测到postmaster挂掉就可以马上退出而不是返回.
     */
    bool        exit_on_postmaster_death;
#if defined(WAIT_USE_EPOLL)
    //使用epoll
    int         epoll_fd;
    /* epoll_wait returns events in a user provided arrays, allocate once */
    //epoll_wait在用户提供的数组中返回事件,只需要分配一次
    struct epoll_event *epoll_ret_events;
#elif defined(WAIT_USE_POLL)
    //使用poll
    /* poll expects events to be waited on every poll() call, prepare once */
    //poll期望事件在每一次poll()调用时等待,只需要准备一次
    struct pollfd *pollfds;
#elif defined(WAIT_USE_WIN32)
    //WIN32
    /*
     * Array of windows events. The first element always contains
     * pgwin32_signal_event, so the remaining elements are offset by one (i.e.
     * event->pos + 1).
     * windows事件数组.
     * 第一个元素通常存储的是pgwin32_signal_event,
     *   因此遗留的元素是相对该元素的偏移(比如event->pos + 1)
     */
    HANDLE     *handles;
#endif
};

二、源码解读

WalSndLoop
通过Copy处理WAL流数据的walsender进程主循环.
其主要逻辑如下:
1.获取时间戳,设置相关标记
2.进入循环
2.1重置MyLatch,检查中断
2.2处理最近接收的请求或信号
2.3检查客户端输入
2.4如果从客户端接收到CopyDone信号,并且buffer为空,则退出循环
2.5执行相关处理并设置WalSndCaughtUp变量
2.6如WalSndCaughtUp为T并且没有挂起待处理的数据
2.6.1设置状态并判断接收的信号执行相关处理
2.7检查复制是否超时以及是否需要发送Keepalive
2.8如处于CaughtUp状态并且仍未完成Streaming或者存在挂起的数据
则设置等待时间,执行WaitLatchOrSocket


/* Main loop of walsender process that streams the WAL over Copy messages. */
//通过Copy处理WAL流数据的walsender进程主循环
static void
WalSndLoop(WalSndSendDataCallback send_data)
{
    /*
     * Initialize the last reply timestamp. That enables timeout processing
     * from hereon.
     * 初始化最后一个响应时间戳.
     * 在这里开始,启用超时处理.
     */
    last_reply_timestamp = GetCurrentTimestamp();
    waiting_for_ping_response = false;
    /*
     * Loop until we reach the end of this timeline or the client requests to
     * stop streaming.
     * 循环,直至达到时间线的末尾或者客户端请求停止streaming.
     */
    for (;;)
    {
        /* Clear any already-pending wakeups */
        //清除所有已挂起的wakeups.
        ResetLatch(MyLatch);
        //检查中断
        CHECK_FOR_INTERRUPTS();
        /* Process any requests or signals received recently */
        //处理最近接收的请求或信号
        if (ConfigReloadPending)
        {
            ConfigReloadPending = false;
            ProcessConfigFile(PGC_SIGHUP);
            SyncRepInitConfig();
        }
        /* Check for input from the client */
        //检查客户端输入
        ProcessRepliesIfAny();
        /*
         * If we have received CopyDone from the client, sent CopyDone
         * ourselves, and the output buffer is empty, it's time to exit
         * streaming.
         * 如果从客户端接收到CopyDone信号,自行发送CopyDone,
         *   同时输出buffer是空的,是时候退出streaming了.
         */
        if (streamingDoneReceiving && streamingDoneSending &&
            !pq_is_send_pending())
            //跳出循环
            break;
        /*
         * If we don't have any pending data in the output buffer, try to send
         * some more.  If there is some, we don't bother to call send_data
         * again until we've flushed it ... but we'd better assume we are not
         * caught up.
         * 如果在输出缓冲区中没有挂起的数据,尝试发送更多的数据.
         * 如果已存在数据,那么在刷新之前不需要再次调用send_data ...
         *   但我们最好假定没有捕获这些数据.
         */
        if (!pq_is_send_pending())
            //发送数据
            send_data();
        else
            WalSndCaughtUp = false;
        /* Try to flush pending output to the client */
        //尝试刷新挂起的输出到客户端
        if (pq_flush_if_writable() != 0)
            WalSndShutdown();
        /* If nothing remains to be sent right now ... */
        //如果现在没有遗留数据
        if (WalSndCaughtUp && !pq_is_send_pending())
        {
            /*
             * If we're in catchup state, move to streaming.  This is an
             * important state change for users to know about, since before
             * this point data loss might occur if the primary dies and we
             * need to failover to the standby. The state change is also
             * important for synchronous replication, since commits that
             * started to wait at that point might wait for some time.
             * 如果处于catchup状态,切换到streaming.
             * 对于客户端来说,让它们知道状态变换是很重要的,
             *   因为在此时点前如果主节点崩溃需要切换到备机时数据可能会出现丢失.
             * 对于同步复制而已,这个状态变更也很重要,
             *   因为在这个时间点开始的commits操作可能会等待一段时间.
             */
            if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
            {
                ereport(DEBUG1,
                        (errmsg("\"%s\" has now caught up with upstream server",
                                application_name)));
                //设置状态
                WalSndSetState(WALSNDSTATE_STREAMING);
            }
            /*
             * When SIGUSR2 arrives, we send any outstanding logs up to the
             * shutdown checkpoint record (i.e., the latest record), wait for
             * them to be replicated to the standby, and exit. This may be a
             * normal termination at shutdown, or a promotion, the walsender
             * is not sure which.
             * 在接收到SIGUSR2信号时,我们将所有未完成的日志发送到shutdown checkpoint record
             *   (比如最后一条记录),等待这些日志复制到standby节点,执行完毕则退出.
             * 这可能是在关闭过程中的正常终止,也可能是升级,但walsender无法确认.
             */
            if (got_SIGUSR2)
                WalSndDone(send_data);
        }
        /* Check for replication timeout. */
        //检查复制超时
        WalSndCheckTimeOut();
        /* Send keepalive if the time has come */
        //是时候发送keepalive了
        WalSndKeepaliveIfNecessary();
        /*
         * We don't block if not caught up, unless there is unsent data
         * pending in which case we'd better block until the socket is
         * write-ready.  This test is only needed for the case where the
         * send_data callback handled a subset of the available data but then
         * pq_flush_if_writable flushed it all --- we should immediately try
         * to send more.
         * 如果没有caught up则不要阻塞,除非有未发送的数据挂起,
         *   在这种情况下,进程最好阻塞直至socket write-ready.
         * 这个测试仅在send_data回调函数处理可用数据的一部分但pq_flush_if_writable进行全部刷新时需要.
         * 这时候我们应该马上尝试发送更多的数据.
         */
        if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
        {
            long        sleeptime;
            int         wakeEvents;
            wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
                         WL_SOCKET_READABLE;
            /*
             * Use fresh timestamp, not last_processed, to reduce the chance
             * of reaching wal_sender_timeout before sending a keepalive.
             * 在发送keepalive前,
             *   使用刷新时间戳而不是last_processed来减少超过wal_sender_timeout
             */
            sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
            if (pq_is_send_pending())
                wakeEvents |= WL_SOCKET_WRITEABLE;
            /* Sleep until something happens or we time out */
            //休眠直至某些事情发送或者超时
            (void) WaitLatchOrSocket(MyLatch, wakeEvents,
                                     MyProcPort->sock, sleeptime,
                                     WAIT_EVENT_WAL_SENDER_MAIN);
        }
    }
    return;
}

WaitLatchOrSocket
该函数与WaitLatch类似,但额外有一个socket参数,用于WL SOCKET *.
在WaitEventSet中添加等待事件,调用函数WaitEventSetWait,等待事件的发生或者超时.


/*
 * Like WaitLatch, but with an extra socket argument for WL_SOCKET_*
 * conditions.
 * 与WaitLatch类似,但额外有一个socket参数,用于WL_SOCKET_*
 *
 * When waiting on a socket, EOF and error conditions always cause the socket
 * to be reported as readable/writable/connected, so that the caller can deal
 * with the condition.
 * 在写入socket时,EOF和错误的条件通常会导致socket被视为可读写和已连接,因此调用者可处理该条件.
 *
 * wakeEvents must include either WL_EXIT_ON_PM_DEATH for automatic exit
 * if the postmaster dies or WL_POSTMASTER_DEATH for a flag set in the
 * return value if the postmaster dies.  The latter is useful for rare cases
 * where some behavior other than immediate exit is needed.
 * wakeEvents必须包含WL_EXIT_ON_PM_DEATH用于在postmaster崩溃时的自动退出
 *   或者WL_POSTMASTER_DEATH用于在postmaster崩溃时返回标记.
 * 后者用于以下极少数的情况:需要执行某些动作而不是马上退出.
 *
 * NB: These days this is just a wrapper around the WaitEventSet API. When
 * using a latch very frequently, consider creating a longer living
 * WaitEventSet instead; that's more efficient.
 * 注意:现在,这只是WaitEventSet API的一个包装器.
 * 在非常频繁的使用latch的情况下,应考虑创建一个长生命周期的WaitEventSet,这会更有效率.
 */
int
WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
                  long timeout, uint32 wait_event_info)
{
    int         ret = 0;
    int         rc;
    WaitEvent   event;
    //创建WaitEventSet,性能上的考虑
    WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
    if (wakeEvents & WL_TIMEOUT)
        //超时
        Assert(timeout >= 0);
    else
        timeout = -1;
    if (wakeEvents & WL_LATCH_SET)
        //设置了LATCH
        AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET,
                          latch, NULL);
    /* Postmaster-managed callers must handle postmaster death somehow. */
    //Postmaster-managed的调用者必须处理postmaster崩溃的情况.
    Assert(!IsUnderPostmaster ||
           (wakeEvents & WL_EXIT_ON_PM_DEATH) ||
           (wakeEvents & WL_POSTMASTER_DEATH));
    if ((wakeEvents & WL_POSTMASTER_DEATH) && IsUnderPostmaster)
        AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
                          NULL, NULL);
    if ((wakeEvents & WL_EXIT_ON_PM_DEATH) && IsUnderPostmaster)
        AddWaitEventToSet(set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
                          NULL, NULL);
    if (wakeEvents & WL_SOCKET_MASK)
    {
        int         ev;
        ev = wakeEvents & WL_SOCKET_MASK;
        AddWaitEventToSet(set, ev, sock, NULL, NULL);
    }
    rc = WaitEventSetWait(set, timeout, &event, 1, wait_event_info);
    if (rc == 0)
        ret |= WL_TIMEOUT;
    else
    {
        ret |= event.events & (WL_LATCH_SET |
                               WL_POSTMASTER_DEATH |
                               WL_SOCKET_MASK);
    }
    //释放资源
    FreeWaitEventSet(set);
    return ret;
}

WaitEventSetWait
等待加入到等待事件集合中的事件发生,或者直至超时.
循环等待,调用WaitEventSetWaitBlock,直至有事件发生.


/*
 * Wait for events added to the set to happen, or until the timeout is
 * reached.  At most nevents occurred events are returned.
 * 等待加入到等待事件集合中的事件发生,或者直至超时.
 * 在大多数情况下,返回nevenets已发生事件.
 *
 * If timeout = -1, block until an event occurs; if 0, check sockets for
 * readiness, but don't block; if > 0, block for at most timeout milliseconds.
 * 如果timeout = -1,阻塞直至事件发生.
 * 如果timeout = 0,检查sockets用于读取,但不阻塞.
 * 如果timeout > 0,阻塞直至超时.
 *
 * Returns the number of events occurred, or 0 if the timeout was reached.
 * 返回出现的实践计数,如出现超时则返回0
 *
 * Returned events will have the fd, pos, user_data fields set to the
 * values associated with the registered event.
 * 返回的事件会包含fd,pos,user_data字段信息用于与注册事件关联.
 */
int
WaitEventSetWait(WaitEventSet *set, long timeout,
                 WaitEvent *occurred_events, int nevents,
                 uint32 wait_event_info)
{
    int         returned_events = 0;
    instr_time  start_time;
    instr_time  cur_time;
    long        cur_timeout = -1;
    Assert(nevents > 0);
    /*
     * Initialize timeout if requested.  We must record the current time so
     * that we can determine the remaining timeout if interrupted.
     * 如请求则初始化超时处理.
     * 必须记录当前时间以便可以确定在中断发送时剩余时间.
     */
    if (timeout >= 0)
    {
        INSTR_TIME_SET_CURRENT(start_time);
        Assert(timeout >= 0 && timeout <= INT_MAX);
        cur_timeout = timeout;
    }
    pgstat_report_wait_start(wait_event_info);
#ifndef WIN32
    waiting = true;
#else
    /* Ensure that signals are serviced even if latch is already set */
    pgwin32_dispatch_queued_signals();
#endif
    while (returned_events == 0)//未有事件发送
    {
        int         rc;
        /*
         * Check if the latch is set already. If so, leave the loop
         * immediately, avoid blocking again. We don't attempt to report any
         * other events that might also be satisfied.
         * 检查latch是否已设置.
         * 如设置,则马上退出循环,避免再次阻塞.
         * 不需要尝试报告其他可能已满足条件的事件.
         *
         * If someone sets the latch between this and the
         * WaitEventSetWaitBlock() below, the setter will write a byte to the
         * pipe (or signal us and the signal handler will do that), and the
         * readiness routine will return immediately.
         * 如果某进程在这里和接下来的WaitEventSetWaitBlock()之间设置了latch,
         *   设置器会写入一个字节到pipe中(或者发信号给我们,信号处理器会处理该信号),
         *   readiness例程会马上返回.
         *
         * On unix, If there's a pending byte in the self pipe, we'll notice
         * whenever blocking. Only clearing the pipe in that case avoids
         * having to drain it every time WaitLatchOrSocket() is used. Should
         * the pipe-buffer fill up we're still ok, because the pipe is in
         * nonblocking mode. It's unlikely for that to happen, because the
         * self pipe isn't filled unless we're blocking (waiting = true), or
         * from inside a signal handler in latch_sigusr1_handler().
         * 在Unix平台,如果在自己的pipe中存在挂起的字节,则会注意到何时出现的阻塞.
         * 在这种情况下只有清理pipe中的数据以避免每次使用WaitLatchOrSocket()时都必须将其清除.
         * pipe-buffer是否填满进程是没有问题的,因为pipe处于非阻塞模式.
         * 这不同于事件发生,因为self pipe直到阻塞(waiting = true),
         *   或者在latch_sigusr1_handler()中的信号控制器中不会填充数据.
         *
         * On windows, we'll also notice if there's a pending event for the
         * latch when blocking, but there's no danger of anything filling up,
         * as "Setting an event that is already set has no effect.".
         * 在Windows平台,我们同时会注意到对于阻塞时,latch上是否存在挂起的事件,
         *   但对于填充来说没有任何的问题,这可以视为"设置已设置的事件是不会有问题的".
         *
         * Note: we assume that the kernel calls involved in latch management
         * will provide adequate synchronization on machines with weak memory
         * ordering, so that we cannot miss seeing is_set if a notification
         * has already been queued.
         * 注意:假定涉及latch管理的内核调用将在内存顺序较差的机器上提供足够的同步保护,
         *   这样在通知已入队列时不会错过is_set标记.
         */
        if (set->latch && set->latch->is_set)
        {
            //已设置latch
            occurred_events->fd = PGINVALID_SOCKET;
            occurred_events->pos = set->latch_pos;
            occurred_events->user_data =
                set->events[set->latch_pos].user_data;//用户数据
            occurred_events->events = WL_LATCH_SET;
            occurred_events++;
            returned_events++;
            //退出循环
            break;
        }
        /*
         * Wait for events using the readiness primitive chosen at the top of
         * this file. If -1 is returned, a timeout has occurred, if 0 we have
         * to retry, everything >= 1 is the number of returned events.
         * 等待使用在该文件最前面选择的原始readiness方法的事件发生.
         * 如果返回-1,意味着超时发生,如为0,只能再次重试,>= 1是返回的events数量.
         */
        rc = WaitEventSetWaitBlock(set, cur_timeout,
                                   occurred_events, nevents);
        if (rc == -1)
            //超时
            break;              /* timeout occurred */
        else
            //事件数
            returned_events = rc;
        /* If we're not done, update cur_timeout for next iteration */
        //如为完成,更新cur_timeout已备下次循环迭代
        if (returned_events == 0 && timeout >= 0)
        {
            INSTR_TIME_SET_CURRENT(cur_time);
            INSTR_TIME_SUBTRACT(cur_time, start_time);
            cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time);
            if (cur_timeout <= 0)
                break;
        }
    }
#ifndef WIN32
    waiting = false;
#endif
    pgstat_report_wait_end();
    return returned_events;
}

WaitEventSetWaitBlock
使用linux’s epoll_wait(2)等待.
调用epoll_wait方法,如发生事件,遍历events执行相关处理.


#if defined(WAIT_USE_EPOLL)
/*
 * Wait using linux's epoll_wait(2).
 * 使用linux's epoll_wait(2)等待.
 *
 * This is the preferable wait method, as several readiness notifications are
 * delivered, without having to iterate through all of set->events. The return
 * epoll_event struct contain a pointer to our events, making association
 * easy.
 * 这是一种更好的等待方法,随着多个readiness通知的交付,而不需要通过迭代所有的set->events.
 * 返回的epoll_event结构体保存了指向事件的指针,让关联变更简单.
 */
static inline int
WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
                      WaitEvent *occurred_events, int nevents)
{
    int         returned_events = 0;
    int         rc;
    WaitEvent  *cur_event;
    struct epoll_event *cur_epoll_event;
    /* Sleep */
    //休眠
    rc = epoll_wait(set->epoll_fd, set->epoll_ret_events,
                    nevents, cur_timeout);
    /* Check return code */
    //检查返回代码
    if (rc < 0)
    {
        /*  is okay, otherwise complain */
        //验证EINTR是否ok,否则报错
        if (errno != EINTR)
        {
            waiting = false;
            ereport(ERROR,
                    (errcode_for_socket_access(),
                     errmsg("epoll_wait() failed: %m")));
        }
        return 0;
    }
    else if (rc == 0)
    {
        /* timeout exceeded */
        //超时
        return -1;
    }
    /*
     * At least one event occurred, iterate over the returned epoll events
     * until they're either all processed, or we've returned all the events
     * the caller desired.
     * 至少有一个事件发生了,迭代返回的epoll事件,直至全部被处理或者已返回调用者锁指定的所有事件
     */
    for (cur_epoll_event = set->epoll_ret_events;
         cur_epoll_event < (set->epoll_ret_events + rc) &&
         returned_events < nevents;
         cur_epoll_event++)
    {
        /* epoll's data pointer is set to the associated WaitEvent */
        //epoll的数据指针设置为关联WaitEvent
        cur_event = (WaitEvent *) cur_epoll_event->data.ptr;
        occurred_events->pos = cur_event->pos;
        occurred_events->user_data = cur_event->user_data;
        occurred_events->events = 0;
        if (cur_event->events == WL_LATCH_SET &&
            cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
        {
            //------------- 出现事件
            /* There's data in the self-pipe, clear it. */
            //在self-pipe中存在数据,清除之
            drainSelfPipe();
            if (set->latch->is_set)
            {
                occurred_events->fd = PGINVALID_SOCKET;
                occurred_events->events = WL_LATCH_SET;
                occurred_events++;
                returned_events++;
            }
        }
        else if (cur_event->events == WL_POSTMASTER_DEATH &&
                 cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
        {
            //------------- postmaster挂了
            /*
             * We expect an EPOLLHUP when the remote end is closed, but
             * because we don't expect the pipe to become readable or to have
             * any errors either, treat those cases as postmaster death, too.
             *
             * Be paranoid about a spurious event signalling the postmaster as
             * being dead.  There have been reports about that happening with
             * older primitives (select(2) to be specific), and a spurious
             * WL_POSTMASTER_DEATH event would be painful. Re-checking doesn't
             * cost much.
             */
            if (!PostmasterIsAliveInternal())
            {
                if (set->exit_on_postmaster_death)
                    proc_exit(1);
                occurred_events->fd = PGINVALID_SOCKET;
                occurred_events->events = WL_POSTMASTER_DEATH;
                occurred_events++;
                returned_events++;
            }
        }
        else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
        {
            //--------- socket可读写
            Assert(cur_event->fd != PGINVALID_SOCKET);
            if ((cur_event->events & WL_SOCKET_READABLE) &&
                (cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP)))
            {
                /* data available in socket, or EOF */
                //socket已有可用数据,或者已达末尾(EOF)
                occurred_events->events |= WL_SOCKET_READABLE;
            }
            if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
                (cur_epoll_event->events & (EPOLLOUT | EPOLLERR | EPOLLHUP)))
            {
                /* writable, or EOF */
                //可写或者EOF
                occurred_events->events |= WL_SOCKET_WRITEABLE;
            }
            if (occurred_events->events != 0)
            {
                occurred_events->fd = cur_event->fd;
                occurred_events++;
                returned_events++;
            }
        }
    }
    return returned_events;
}
#elif defined(WAIT_USE_POLL)
/*
 * Wait using poll(2).
 *
 * This allows to receive readiness notifications for several events at once,
 * but requires iterating through all of set->pollfds.
 */
static inline int
WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
                      WaitEvent *occurred_events, int nevents)
{
    int         returned_events = 0;
    int         rc;
    WaitEvent  *cur_event;
    struct pollfd *cur_pollfd;
    /* Sleep */
    rc = poll(set->pollfds, set->nevents, (int) cur_timeout);
    /* Check return code */
    if (rc < 0)
    {
        /* EINTR is okay, otherwise complain */
        if (errno != EINTR)
        {
            waiting = false;
            ereport(ERROR,
                    (errcode_for_socket_access(),
                     errmsg("poll() failed: %m")));
        }
        return 0;
    }
    else if (rc == 0)
    {
        /* timeout exceeded */
        return -1;
    }
    for (cur_event = set->events, cur_pollfd = set->pollfds;
         cur_event < (set->events + set->nevents) &&
         returned_events < nevents;
         cur_event++, cur_pollfd++)
    {
        /* no activity on this FD, skip */
        if (cur_pollfd->revents == 0)
            continue;
        occurred_events->pos = cur_event->pos;
        occurred_events->user_data = cur_event->user_data;
        occurred_events->events = 0;
        if (cur_event->events == WL_LATCH_SET &&
            (cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
        {
            /* There's data in the self-pipe, clear it. */
            drainSelfPipe();
            if (set->latch->is_set)
            {
                occurred_events->fd = PGINVALID_SOCKET;
                occurred_events->events = WL_LATCH_SET;
                occurred_events++;
                returned_events++;
            }
        }
        else if (cur_event->events == WL_POSTMASTER_DEATH &&
                 (cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
        {
            /*
             * We expect an POLLHUP when the remote end is closed, but because
             * we don't expect the pipe to become readable or to have any
             * errors either, treat those cases as postmaster death, too.
             *
             * Be paranoid about a spurious event signalling the postmaster as
             * being dead.  There have been reports about that happening with
             * older primitives (select(2) to be specific), and a spurious
             * WL_POSTMASTER_DEATH event would be painful. Re-checking doesn't
             * cost much.
             */
            if (!PostmasterIsAliveInternal())
            {
                if (set->exit_on_postmaster_death)
                    proc_exit(1);
                occurred_events->fd = PGINVALID_SOCKET;
                occurred_events->events = WL_POSTMASTER_DEATH;
                occurred_events++;
                returned_events++;
            }
        }
        else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
        {
            int         errflags = POLLHUP | POLLERR | POLLNVAL;
            Assert(cur_event->fd >= PGINVALID_SOCKET);
            if ((cur_event->events & WL_SOCKET_READABLE) &&
                (cur_pollfd->revents & (POLLIN | errflags)))
            {
                /* data available in socket, or EOF */
                occurred_events->events |= WL_SOCKET_READABLE;
            }
            if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
                (cur_pollfd->revents & (POLLOUT | errflags)))
            {
                /* writeable, or EOF */
                occurred_events->events |= WL_SOCKET_WRITEABLE;
            }
            if (occurred_events->events != 0)
            {
                occurred_events->fd = cur_event->fd;
                occurred_events++;
                returned_events++;
            }
        }
    }
    return returned_events;
}
#elif defined(WAIT_USE_WIN32)
/*
 * Wait using Windows' WaitForMultipleObjects().
 *
 * Unfortunately this will only ever return a single readiness notification at
 * a time.  Note that while the official documentation for
 * WaitForMultipleObjects is ambiguous about multiple events being "consumed"
 * with a single bWaitAll = FALSE call,
 * https://blogs.msdn.microsoft.com/oldnewthing/20150409-00/?p=44273 confirms
 * that only one event is "consumed".
 */
static inline int
WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
                      WaitEvent *occurred_events, int nevents)
{
    int         returned_events = 0;
    DWORD       rc;
    WaitEvent  *cur_event;
    /* Reset any wait events that need it */
    for (cur_event = set->events;
         cur_event < (set->events + set->nevents);
         cur_event++)
    {
        if (cur_event->reset)
        {
            WaitEventAdjustWin32(set, cur_event);
            cur_event->reset = false;
        }
        /*
         * Windows does not guarantee to log an FD_WRITE network event
         * indicating that more data can be sent unless the previous send()
         * failed with WSAEWOULDBLOCK.  While our caller might well have made
         * such a call, we cannot assume that here.  Therefore, if waiting for
         * write-ready, force the issue by doing a dummy send().  If the dummy
         * send() succeeds, assume that the socket is in fact write-ready, and
         * return immediately.  Also, if it fails with something other than
         * WSAEWOULDBLOCK, return a write-ready indication to let our caller
         * deal with the error condition.
         */
        if (cur_event->events & WL_SOCKET_WRITEABLE)
        {
            char        c;
            WSABUF      buf;
            DWORD       sent;
            int         r;
            buf.buf = &c;
            buf.len = 0;
            r = WSASend(cur_event->fd, &buf, 1, &sent, 0, NULL, NULL);
            if (r == 0 || WSAGetLastError() != WSAEWOULDBLOCK)
            {
                occurred_events->pos = cur_event->pos;
                occurred_events->user_data = cur_event->user_data;
                occurred_events->events = WL_SOCKET_WRITEABLE;
                occurred_events->fd = cur_event->fd;
                return 1;
            }
        }
    }
    /*
     * Sleep.
     *
     * Need to wait for ->nevents + 1, because signal handle is in [0].
     */
    rc = WaitForMultipleObjects(set->nevents + 1, set->handles, FALSE,
                                cur_timeout);
    /* Check return code */
    if (rc == WAIT_FAILED)
        elog(ERROR, "WaitForMultipleObjects() failed: error code %lu",
             GetLastError());
    else if (rc == WAIT_TIMEOUT)
    {
        /* timeout exceeded */
        return -1;
    }
    if (rc == WAIT_OBJECT_0)
    {
        /* Service newly-arrived signals */
        pgwin32_dispatch_queued_signals();
        return 0;               /* retry */
    }
    /*
     * With an offset of one, due to the always present pgwin32_signal_event,
     * the handle offset directly corresponds to a wait event.
     */
    cur_event = (WaitEvent *) &set->events[rc - WAIT_OBJECT_0 - 1];
    occurred_events->pos = cur_event->pos;
    occurred_events->user_data = cur_event->user_data;
    occurred_events->events = 0;
    if (cur_event->events == WL_LATCH_SET)
    {
        if (!ResetEvent(set->latch->event))
            elog(ERROR, "ResetEvent failed: error code %lu", GetLastError());
        if (set->latch->is_set)
        {
            occurred_events->fd = PGINVALID_SOCKET;
            occurred_events->events = WL_LATCH_SET;
            occurred_events++;
            returned_events++;
        }
    }
    else if (cur_event->events == WL_POSTMASTER_DEATH)
    {
        /*
         * Postmaster apparently died.  Since the consequences of falsely
         * returning WL_POSTMASTER_DEATH could be pretty unpleasant, we take
         * the trouble to positively verify this with PostmasterIsAlive(),
         * even though there is no known reason to think that the event could
         * be falsely set on Windows.
         */
        if (!PostmasterIsAliveInternal())
        {
            if (set->exit_on_postmaster_death)
                proc_exit(1);
            occurred_events->fd = PGINVALID_SOCKET;
            occurred_events->events = WL_POSTMASTER_DEATH;
            occurred_events++;
            returned_events++;
        }
    }
    else if (cur_event->events & WL_SOCKET_MASK)
    {
        WSANETWORKEVENTS resEvents;
        HANDLE      handle = set->handles[cur_event->pos + 1];
        Assert(cur_event->fd);
        occurred_events->fd = cur_event->fd;
        ZeroMemory(&resEvents, sizeof(resEvents));
        if (WSAEnumNetworkEvents(cur_event->fd, handle, &resEvents) != 0)
            elog(ERROR, "failed to enumerate network events: error code %u",
                 WSAGetLastError());
        if ((cur_event->events & WL_SOCKET_READABLE) &&
            (resEvents.lNetworkEvents & FD_READ))
        {
            /* data available in socket */
            occurred_events->events |= WL_SOCKET_READABLE;
            /*------
             * WaitForMultipleObjects doesn't guarantee that a read event will
             * be returned if the latch is set at the same time.  Even if it
             * did, the caller might drop that event expecting it to reoccur
             * on next call.  So, we must force the event to be reset if this
             * WaitEventSet is used again in order to avoid an indefinite
             * hang.  Refer https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx
             * for the behavior of socket events.
             *------
             */
            cur_event->reset = true;
        }
        if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
            (resEvents.lNetworkEvents & FD_WRITE))
        {
            /* writeable */
            occurred_events->events |= WL_SOCKET_WRITEABLE;
        }
        if ((cur_event->events & WL_SOCKET_CONNECTED) &&
            (resEvents.lNetworkEvents & FD_CONNECT))
        {
            /* connected */
            occurred_events->events |= WL_SOCKET_CONNECTED;
        }
        if (resEvents.lNetworkEvents & FD_CLOSE)
        {
            /* EOF/error, so signal all caller-requested socket flags */
            occurred_events->events |= (cur_event->events & WL_SOCKET_MASK);
        }
        if (occurred_events->events != 0)
        {
            occurred_events++;
            returned_events++;
        }
    }
    return returned_events;
}
#endif

三、跟踪分析

在主节点上用gdb跟踪postmaster,在PostgresMain上设置断点后启动standby节点,进入断点

郾城网站制作公司哪家好,找创新互联!从网页设计、网站建设、微信开发、APP开发、自适应网站建设等网站项目制作,到程序开发,运营维护。创新互联从2013年开始到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选创新互联。


[xdb@localhost ~]$ ps -ef|grep postgres
xdb       1376     1  1 14:16 pts/0    00:00:00 /appdb/xdb/pg11.2/bin/postgres
[xdb@localhost ~]$ gdb -p 1376
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
...
(gdb) set follow-fork-mode child
(gdb) b WalSndLoop
Breakpoint 1 at 0x853e63: file walsender.c, line 2111.
(gdb) c
Continuing.
[New process 1450]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7f17cfa9a8c0 (LWP 1450)]
Breakpoint 1, WalSndLoop (send_data=0x8547fe ) at walsender.c:2111
2111        last_reply_timestamp = GetCurrentTimestamp();
(gdb)

获取时间戳,设置相关标记


(gdb) n
2112        waiting_for_ping_response = false;
(gdb) p last_reply_timestamp
$1 = 606818445090174
(gdb)

重置MyLatch


(gdb) n
2124            if (!PostmasterIsAlive())
(gdb) 
2128            ResetLatch(MyLatch);
(gdb) p MyLatch
$2 = (struct Latch *) 0x7f17c46994d4
(gdb) p *MyLatch
$3 = {is_set = 1, is_shared = true, owner_pid = 1465}
(gdb) n
2130            CHECK_FOR_INTERRUPTS();
(gdb) p *MyLatch
$4 = {is_set = 0, is_shared = true, owner_pid = 1465}
(gdb)

处理最近接收到的信号


(gdb) n
2133            if (ConfigReloadPending)
(gdb) 
2141            ProcessRepliesIfAny();
(gdb) 
[Inferior 2 (process 1465) exited normally]
(gdb)

进程退出,新产生了进程1466


xdb       1466  1376  0 16:41 ?        00:00:00 postgres: walsender replicator 192.168.26.26(40516) streaming 0/5D032830

跟踪1466进程


(gdb) attach 1466
Attaching to program: /appdb/xdb/pg11.2/bin/postgres, process 1466
Reading symbols from /lib64/libpthread.so.0...(no debugging symbols found)...done.
[Thread debugging using libthread_db enabled]
...

执行SQL


testdb=# drop table t1;
DROP TABLE

接收到信号SIGUSR1,查看调用栈如下


Program received signal SIGUSR1, User defined signal 1.
0x00007f17cde2d903 in __epoll_wait_nocancel () from /lib64/libc.so.6
(gdb) bt
#0  0x00007f17cde2d903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1  0x000000000088e668 in WaitEventSetWaitBlock (set=0x296e7c8, cur_timeout=29999, occurred_events=0x7fffed781d00, 
    nevents=1) at latch.c:1048

DONE!

四、参考资料

PG Source Code


本文标题:PostgreSQL源码解读(155)-后台进程#7(walsender#3)
网址分享:http://scyanting.com/article/pjjjep.html