Redis 源码分析定时任务原理

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 本文主要是基于 redis 6.2 源码进行分析定时事件的数据结构和常见操作。

数据结构


在 redis 中通过 aeTimeEvent 结构来创建定时任务事件,代码如下:


/* Time event structure */
typedef struct aeTimeEvent {
    // 标识符
    long long id; /* time event identifier. */
    // 定时纳秒数
    monotime when;
    // 定时回调函数
    aeTimeProc *timeProc;
    // 注销定时器时候的回调函数
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
    int refcount; /* refcount to prevent timer events from being
         * freed in recursive time event calls. */
} aeTimeEvent;


常见操作


1. 创建定时事件


redis 中最重要的定时函数且是周期执行的函数,使用的是 serverCron 函数。在 redis 中由于定时任务比较少,因此并没有严格的按照过期时间来排序的,而是按照 id自增 + 头插法 来保证基本有序。


if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
  serverPanic("Can't create event loop timers.");
  exit(1);
}
//创建定时器对象
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;
    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    te->when = getMonotonicUs() + milliseconds * 1000;
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    // 头插法 
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}


数据结构如下图所示:


image.png


2. 触发定时事件


redis 中是采用 IO 复用来进行定时任务的。


  • 查找距离现在最近的定时事件,见 usUntilEarliestTimer 函数:


/* How many microseconds until the first timer should fire.
 * If there are no timers, -1 is returned.
 *
 * Note that's O(N) since time events are unsorted.
 * Possible optimizations (not needed by Redis so far, but...):
 * 1) Insert the event in order, so that the nearest is just the head.
 *    Much better but still insertion or deletion of timers is O(N).
 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
 */
static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
    aeTimeEvent *te = eventLoop->timeEventHead;
    if (te == NULL) return -1;
    aeTimeEvent *earliest = NULL;
    while (te) {
        if (!earliest || te->when < earliest->when)
            earliest = te;
        te = te->next;
    }
    monotime now = getMonotonicUs();
    return (now >= earliest->when) ? 0 : earliest->when - now;
}


这里时间复杂度可能比较高,实际中需要结合具体场景使用。


  • 更新剩余过期时间,想想为啥呢?因为我们前面提到过,IO 复用有可能因为 IO 事件返回,所以需要更新。


if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
  usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
  tv.tv_sec = usUntilTimer / 1000000;
  tv.tv_usec = usUntilTimer % 1000000;
  tvp = &tv;
} else {
  if (flags & AE_DONT_WAIT) {
    // 不等待
    tv.tv_sec = tv.tv_usec = 0;
    tvp = &tv;
  } else {
    /* Otherwise we can block */
    tvp = NULL; /* wait forever */
  }
}


3. 执行定时事件


一次性的执行完直接删除,周期性的执行完在重新添加到链表。


/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
  int processed = 0;
  aeTimeEvent *te;
  long long maxId;
  te = eventLoop->timeEventHead;
  maxId = eventLoop->timeEventNextId-1;
  monotime now = getMonotonicUs();
  // 删除定时器
  while(te) {
    long long id;
    // 下一轮中对事件进行删除
    /* Remove events scheduled for deletion. */
    if (te->id == AE_DELETED_EVENT_ID) {
      aeTimeEvent *next = te->next;
      /* If a reference exists for this timer event,
             * don't free it. This is currently incremented
             * for recursive timerProc calls */
      if (te->refcount) {
        te = next;
        continue;
      }
      if (te->prev)
        te->prev->next = te->next;
      else
        eventLoop->timeEventHead = te->next;
      if (te->next)
        te->next->prev = te->prev;
      if (te->finalizerProc) {
        te->finalizerProc(eventLoop, te->clientData);
        now = getMonotonicUs();
      }
      zfree(te);
      te = next;
      continue;
    }
    if (te->id > maxId) {
      te = te->next;
      continue;
    }
    if (te->when <= now) {
      int retval;
      id = te->id;
      te->refcount++;
      // timeProc 函数返回值 retVal 为时间事件执行的间隔
      retval = te->timeProc(eventLoop, id, te->clientData);
      te->refcount--;
      processed++;
      now = getMonotonicUs();
      if (retval != AE_NOMORE) {
        te->when = now + retval * 1000;
      } else {
        // 如果超时了,那么标记为删除
        te->id = AE_DELETED_EVENT_ID;
      }
    }
    // 执行下一个
    te = te->next;
  }
  return processed;
}


总结


  1. 优点:实现简单


  1. 缺点:如果定时任务很多,效率比较低。


相关文章
|
18天前
|
消息中间件 存储 缓存
Redis 服务器全方位介绍:从入门到核心原理
Redis是一款高性能、基于内存的NoSQL数据库,支持String、Hash、List、Set、ZSet等丰富数据结构,广泛用于缓存、分布式锁、排行榜、消息队列等场景。支持持久化(RDB/AOF)、主从复制、集群部署,具备原子操作与高并发能力,是构建高可用系统的核心组件之一。(239字)
144 0
|
2月前
|
存储 缓存 监控
Redis分区的核心原理与应用实践
Redis分区通过将数据分散存储于多个节点,提升系统处理高并发与大规模数据的能力。本文详解分区原理、策略及应用实践,涵盖哈希、范围、一致性哈希等分片方式,分析其适用场景与性能优势,并探讨电商秒杀、物联网等典型用例,为构建高性能、可扩展的Redis集群提供参考。
119 0
|
9月前
|
消息中间件 缓存 NoSQL
Redis原理—5.性能和使用总结
本文详细探讨了Redis的阻塞原因、性能优化、缓存相关问题及数据库与缓存的一致性问题。同时还列举了不同缓存操作方案下的并发情况,帮助读者理解并选择合适的缓存管理策略。最终得出结论,在实际应用中应尽量采用“先更新数据库再删除缓存”的方案,并结合异步重试机制来保证数据的一致性和系统的高性能。
Redis原理—5.性能和使用总结
|
NoSQL Redis
Redis 执行 Lua保证原子性原理
Redis 执行 Lua 保证原子性原理
948 1
|
9月前
|
NoSQL 算法 安全
Redis原理—1.Redis数据结构
本文介绍了Redis 的主要数据结构及应用。
Redis原理—1.Redis数据结构
|
9月前
|
缓存 NoSQL Redis
Redis原理—2.单机数据库的实现
本文概述了Redis数据库的核心结构和操作机制。
Redis原理—2.单机数据库的实现
|
9月前
|
存储 缓存 NoSQL
Redis原理—4.核心原理摘要
Redis 是一个基于内存的高性能NoSQL数据库,支持分布式集群和持久化。其网络通信模型采用多路复用监听与文件事件机制,通过单线程串行化处理大量并发请求,确保高效运行。本文主要简单介绍了 Redis 的核心特性。
|
9月前
|
缓存 NoSQL Redis
Redis原理—3.复制、哨兵和集群
详细介绍了Redis的复制原理、哨兵原理和集群原理。
|
9月前
|
运维 NoSQL 算法
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
本文深入探讨了基于Redis实现分布式锁时遇到的细节问题及解决方案。首先,针对锁续期问题,提出了通过独立服务、获取锁进程自己续期和异步线程三种方式,并详细介绍了如何利用Lua脚本和守护线程实现自动续期。接着,解决了锁阻塞问题,引入了带超时时间的`tryLock`机制,确保在高并发场景下不会无限等待锁。最后,作为知识扩展,讲解了RedLock算法原理及其在实际业务中的局限性。文章强调,在并发量不高的场景中手写分布式锁可行,但推荐使用更成熟的Redisson框架来实现分布式锁,以保证系统的稳定性和可靠性。
474 0
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
|
监控 NoSQL Redis
看完这篇就能弄懂Redis的集群的原理了
看完这篇就能弄懂Redis的集群的原理了
632 0