go进阶-计时器


timer

可以控制时间,确保应用程序的某段代码可以在某个时刻运行。在go中可以单次执行,也可以循环执行。

例如:

fmt.Printf("time.Now().Unix(): %v\n", time.Now().Unix())
// time.Now().Unix(): 1652706497

获取Unix时间戳。

基本特性

Timer

timer := time.NewTimer(time.Second * 2)
<-timer.C // 阻塞两秒后才会继续执行
fmt.Println("done")

根据此特性可以设置:如定时两秒后执行某个程序。

func main() {
    v := make(chan struct{})
    timer := time.AfterFunc(time.Second*2, func() {
        fmt.Println("do sth...")
        v <- struct{}{} // 信号,表明go func结束
    })
    defer timer.Stop()
    <-v
}

Ticker

到时间之后会重新计时。

如,每秒执行一次,直到十秒:

unc main() {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    done := make(chan bool)
    go func() {
        time.Sleep(10 * time.Second)
        done <- true
    }()

    for {
        select {
        case <-done:
            fmt.Println("time up...")
            return
        case t := <-ticker.C:
            fmt.Printf("t.Unix(): %v\n", t.Unix())
        }
    }
}

最小堆:四叉堆

image-20220516211908991

父节点和子节点之间有大小关系,同辈的节点之间没有大小关系。

数据结构

暴露出来的Timer结构体:

type Timer struct {
    C <-chan Time // 接收Timer触发的事件
    r runtimeTimer
}

内部runtimeTimer:

type runtimeTimer struct {
    pp       uintptr  // 计时器所在的处理器的指针地址
    when     int64    // 计时器被唤醒的时间
    period   int64    // 再次被唤醒的时间when + peroid
    f        func(any, uintptr) // 唤醒时候的回调函数
    arg      any // 回调函数参数
    seq      uintptr // 回调函数参数,仅在网络场景用
    nextwhen int64  // 计时器状态为modified时,设置到when字段
    status   uint32  // 计时器状态
}

每个timer存储在对应的处理器中:

type p struct {
    ...
    timersLock mutex
    timers []*timer
    ...
}

timers是一个四叉堆的结构,小的在上面,先执行:

image-20220516213654647

脑海中浮现出两个问题

  1. 为啥是四叉堆?不是二叉堆?别的堆?

首先,四叉堆新加入节点或者对节点元素进行修改,导致元素上浮时,比较次数肯定比二叉堆少,而且而且其次,N叉堆对数据访问更加集中在数组前部分,更有利于缓存。(堆的数据结构而言,数组的话。例如刚开始是第N个,对于二叉堆而言,接下来要访问它的父节点,也就是N/2个,但是如果是四叉堆得话,就需要访问N/4个来比较)

说的也的确比较有道理,这样看是越多叉越好。那为啥不是8呢?

一位仁兄@萌叔说的感觉比较有道理:

假设是d叉堆,n个节点

首先比较插入,节点上浮:

比较次数是(logN / logd),也就是logd_n。例如二叉树,16个节点,第一层1个节点,第二层2个节点,第三层4个节点,第4层8个节点,第5层1个节点。比较的次数也就是二叉堆的层数。

image-20220516215656540

删除的话,是节点下沉:

以这个图为例

image-20220516211908991

因为子节点之间没有关联,所以四个节点都要访问,才能找到最小的。也就是比上浮的操作每层乘以一个d。

O(d * log n / log d)

image-20220516220524884

这样一比较,四叉堆是个最好的选择。

好像挺有道理

  1. 堆何时变化,也就是何时把那个到了时间的头结点打掉?是遍历吗?还是时间到了会有个信号通知?

下面再说。先看看用法。

用法和实现原理

定时器状态

image-20220528110516915

创建定时器

Timer用法

timer := time.NewTimer(time.Second * 2)
<-timer.C // 阻塞两秒后才会继续执行
fmt.Println("done")

Timer原理

// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {
    c := make(chan Time, 1)
    t := &Timer{
        C: c,
        r: runtimeTimer{
            when: when(d),  //1 设置到期时间
            f:    sendTime, //2 设置回调函数,发送当前时间
            arg:  c,
        },
    }
    startTimer(&t.r)
    return t
}

根据前面数据结构提到的Timer结构体,和几个辅助函数:

// when is a helper function for setting the 'when' field of a runtimeTimer.
// It returns what the time will be, in nanoseconds, Duration d in the future.
// If d is negative, it is ignored. If the returned value would be less than
// zero because of an overflow, MaxInt64 is returned.
func when(d Duration) int64 {
    if d <= 0 {
        return runtimeNano()
    }
    t := runtimeNano() + int64(d)
    if t < 0 {
        // N.B. runtimeNano() and d are always positive, so addition
        // (including overflow) will never result in t == 0.
        t = 1<<63 - 1 // math.MaxInt64
    }
    return t
}

// sendTime does a non-blocking send of the current time on c.
func sendTime(c any, seq uintptr) {
    select {
    case c.(chan Time) <- Now():
    default:
    }
}

//具体函数在runtime里面
func startTimer(*runtimeTimer)

runtime/time.go
// startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
    if raceenabled {
        racerelease(unsafe.Pointer(t))
    }
    addtimer(t)
}

Ticker

func NewTicker(d Duration) *Ticker {
    if d <= 0 {
        panic(errors.New("non-positive interval for NewTicker"))
    }
    // Give the channel a 1-element time buffer.
    // If the client falls behind while reading, we drop ticks
    // on the floor until the client catches up.
    c := make(chan Time, 1)
    t := &Ticker{
        C: c,
        r: runtimeTimer{
            when:   when(d),
            period: int64(d),
            f:      sendTime,
            arg:    c,
        },
    }
    startTimer(&t.r)
    return t
}

Timer的区别就是多了一个peroid(再次被唤醒的时间when + peroid)。

启动定时器

刚才提到:创建定时器里面的startTimer

//具体函数在runtime里面
func startTimer(*runtimeTimer)

runtime/time.go
// startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
    if raceenabled {
        racerelease(unsafe.Pointer(t))
    }
    addtimer(t)
}

具体addtimer的行为呢?

func addtimer(t *timer) {
    // ... 校验
    t.status = timerWaiting // 等待计时器启动

    when := t.when

    // Disable preemption while using pp to avoid changing another P's heap.
    mp := acquirem()

    pp := getg().m.p.ptr()
    lock(&pp.timersLock)
    cleantimers(pp) // 清理计时器队列
    doaddtimer(pp, t) // 添加到当前处理器的堆中
    unlock(&pp.timersLock)
  // 中断正在阻塞的网络轮询,根据时间判断是否需要唤醒网络轮询器中休眠的线程。
    wakeNetPoller(when) 

    releasem(mp)
}

wakeNetPoller的行为还是有点迷惑。

停止计时器

timer := time.NewTicker(time.Second * 2)
go func() {
    <-timer.C // 阻塞两秒后才会继续执行
    fmt.Println("done")
}()
timer.Stop() // 不等定时器结束,我不想要了,stop

原理

func (t *Ticker) Stop() {
    stopTimer(&t.r)
}
// 同样对应一个runtime里面的函数
func stopTimer(*runtimeTimer) bool

// runtime/time.go

// stopTimer stops a timer.
// It reports whether t was stopped before being run.
//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {
    return deltimer(t)
}

// deltimer deletes the timer t. It may be on some other P, so we can't
// actually remove it from the timers heap. We can only mark it as deleted.
// It will be removed in due course by the P whose heap it is on.
// Reports whether the timer was removed before it was run.
func deltimer(t *timer) bool {
    for {
        switch s := atomic.Load(&t.status); s {
        case timerWaiting, timerModifiedLater:
            // Prevent preemption while the timer is in timerModifying.
            // This could lead to a self-deadlock. See #38070.
            mp := acquirem()
            if atomic.Cas(&t.status, s, timerModifying) {
                // Must fetch t.pp before changing status,
                // as cleantimers in another goroutine
                // can clear t.pp of a timerDeleted timer.
                tpp := t.pp.ptr()
                if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
                    badTimer()
                }
                releasem(mp)
                atomic.Xadd(&tpp.deletedTimers, 1)
                // Timer was not yet run.
                return true
            } else {
                releasem(mp)
            }
        case timerModifiedEarlier:
            // Prevent preemption while the timer is in timerModifying.
            // This could lead to a self-deadlock. See #38070.
            mp := acquirem()
            if atomic.Cas(&t.status, s, timerModifying) {
                // Must fetch t.pp before setting status
                // to timerDeleted.
                tpp := t.pp.ptr()
                if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
                    badTimer()
                }
                releasem(mp)
                atomic.Xadd(&tpp.deletedTimers, 1)
                // Timer was not yet run.
                return true
            } else {
                releasem(mp)
            }
        case timerDeleted, timerRemoving, timerRemoved:
            // Timer was already run.
            return false
        case timerRunning, timerMoving:
            // The timer is being run or moved, by a different P.
            // Wait for it to complete.
            osyield()
        case timerNoStatus:
            // Removing timer that was never added or
            // has already been run. Also see issue 21874.
            return false
        case timerModifying:
            // Simultaneous calls to deltimer and modtimer.
            // Wait for the other call to complete.
            osyield()
        default:
            badTimer()
        }
    }
}

并不在这里真的删除处理器上挂载的timer节点,而是设置状态。

修改/重置计数器

timer := time.NewTicker(time.Second * 2)
go func() {
    <-timer.C // 等待计时器结束再接下来执行
    fmt.Println("done")
}()
time.Sleep(time.Second)
timer.Reset(time.Second * 10) // 改为十秒后执行

原理

func (t *Ticker) Reset(d Duration) {
    if d <= 0 {
        panic("non-positive interval for Ticker.Reset")
    }
    if t.r.f == nil {
        panic("time: Reset called on uninitialized Ticker")
    }
    modTimer(&t.r, when(d), int64(d), t.r.f, t.r.arg, t.r.seq)
}

不出意外的,modTimer函数实体仍然在runtime包中:

// modtimer modifies an existing timer.
// This is called by the netpoll code or time.Ticker.Reset or time.Timer.Reset.
// Reports whether the timer was modified before it was run.
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
    if when <= 0 {
        throw("timer when must be positive")
    }
    if period < 0 {
        throw("timer period must be non-negative")
    }

    status := uint32(timerNoStatus)
    wasRemoved := false
    var pending bool
    var mp *m
loop:
    for {
        switch status = atomic.Load(&t.status); status {
        case timerWaiting, timerModifiedEarlier, timerModifiedLater:
            // Prevent preemption while the timer is in timerModifying.
            // This could lead to a self-deadlock. See #38070.
            mp = acquirem()
            if atomic.Cas(&t.status, status, timerModifying) {
                pending = true // timer not yet run
            break loop
            }
            releasem(mp)
        case timerNoStatus, timerRemoved:
            // Prevent preemption while the timer is in timerModifying.
            // This could lead to a self-deadlock. See #38070.
            mp = acquirem()

            // Timer was already run and t is no longer in a heap.
            // Act like addtimer.
            if atomic.Cas(&t.status, status, timerModifying) {
                wasRemoved = true
                pending = false // timer already run or stopped
                break loop
            }
            releasem(mp)
        case timerDeleted:
            // Prevent preemption while the timer is in timerModifying.
            // This could lead to a self-deadlock. See #38070.
            mp = acquirem()
            if atomic.Cas(&t.status, status, timerModifying) {
                atomic.Xadd(&t.pp.ptr().deletedTimers, -1)
                pending = false // timer already stopped
                break loop
            }
            releasem(mp)
        case timerRunning, timerRemoving, timerMoving:
            // The timer is being run or moved, by a different P.
            // Wait for it to complete.
            osyield()
        case timerModifying:
            // Multiple simultaneous calls to modtimer.
            // Wait for the other call to complete.
            osyield()
        default:
            badTimer()
        }
    }

    t.period = period
    t.f = f
    t.arg = arg
    t.seq = seq

    if wasRemoved {
        t.when = when
        pp := getg().m.p.ptr()
        lock(&pp.timersLock)
        doaddtimer(pp, t)
        unlock(&pp.timersLock)
        if !atomic.Cas(&t.status, timerModifying, timerWaiting) {
            badTimer()
        }
        releasem(mp)
        wakeNetPoller(when)
    } else {
        // The timer is in some other P's heap, so we can't change
        // the when field. If we did, the other P's heap would
        // be out of order. So we put the new when value in the
        // nextwhen field, and let the other P set the when field
        // when it is prepared to resort the heap.
        t.nextwhen = when

        newStatus := uint32(timerModifiedLater)
        if when < t.when {
            newStatus = timerModifiedEarlier
        }

        tpp := t.pp.ptr()

        if newStatus == timerModifiedEarlier {
            updateTimerModifiedEarliest(tpp, when)
        }

        // Set the new status of the timer.
        if !atomic.Cas(&t.status, timerModifying, newStatus) {
            badTimer()
        }
        releasem(mp)

        // If the new status is earlier, wake up the poller.
        if newStatus == timerModifiedEarlier {
            wakeNetPoller(when)
        }
    }

    return pending
}

做的还是变更timer的状态。

触发计时器

Go1.14之后,Timer归属到了各个处理器之中,这里有两部分内容:

  • 通过调度器在调度时进行计时器的触发。
  • 通过系统监控检查并触发计时器(到期未执行)。

调度器触发

  • 调用 NewTimer,timer.After, timer.AfterFunc 生产 timer, 加入对应的 P 的堆上。
  • 调用 timer.Stop, timer.Reset 改变对应的 timer 的状态。
  • GMP 在调度周期内中会调用 checkTimers ,遍历该 P 的 timer 堆上的元素,根据对应 timer 的状态执行真的操作。
  • 当前处理器如果没有可执行的Timer,并且没有可执行的G,那么按照调度模型,就会去窃取其他计时器和 G。

系统监控触发

即使是通过每次调度器调度和窃取的时候触发,但毕竟是具有一定的随机和不确定性。因此系统监控触发依然是一个兜底保障,在 Go 语言中 runtime.sysmon 方法承担了这一个责任。

剩余问题

wakeNetPoller的行为还是有点迷惑。例如在addTimer里面:

func addtimer(t *timer) {
    // ... 校验
    t.status = timerWaiting // 等待计时器启动

    when := t.when

    // Disable preemption while using pp to avoid changing another P's heap.
    mp := acquirem()

    pp := getg().m.p.ptr()
    lock(&pp.timersLock)
    cleantimers(pp) // 清理计时器队列
    doaddtimer(pp, t) // 添加到当前处理器的堆中
    unlock(&pp.timersLock)
  // 中断正在阻塞的网络轮询,根据时间判断是否需要唤醒网络轮询器中休眠的线程。
    wakeNetPoller(when) 

    releasem(mp)
}

具体是啥看下定义:

// wakeNetPoller wakes up the thread sleeping in the network poller if it isn't
// going to wake up before the when argument; or it wakes an idle P to service
// timers and the network poller if there isn't one already.
func wakeNetPoller(when int64) {
    if atomic.Load64(&sched.lastpoll) == 0 {
        // In findrunnable we ensure that when polling the pollUntil
        // field is either zero or the time to which the current
        // poll is expected to run. This can have a spurious wakeup
        // but should never miss a wakeup.
        pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
        if pollerPollUntil == 0 || pollerPollUntil > when {
            netpollBreak()
        }
    } else {
        // There are no threads in the network poller, try to get
        // one there so it can handle new timers.
        if GOOS != "plan9" { // Temporary workaround - see issue #42303.
            wakep()
        }
    }
}

唤醒网络轮询器中休眠的线程,检查计时器被唤醒的时间(when)是否在当前轮询预期运行的时间(pollerPollUntil)内,若是唤醒。

总结

其实还是有点迷惑的,但是再深入就牵扯到更深更杂的东西了,螺旋上升吧,继续努力,共勉。