Simulation Core
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

172 lines
4.4 KiB

  1. #ifndef __HIREDIS_LIBUV_H__
  2. #define __HIREDIS_LIBUV_H__
  3. #include <stdlib.h>
  4. #include <uv.h>
  5. #include "../hiredis.h"
  6. #include "../async.h"
  7. #include <string.h>
  8. typedef struct redisLibuvEvents {
  9. redisAsyncContext* context;
  10. uv_poll_t handle;
  11. uv_timer_t timer;
  12. int events;
  13. } redisLibuvEvents;
  14. static void redisLibuvPoll(uv_poll_t* handle, int status, int events) {
  15. redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
  16. int ev = (status ? p->events : events);
  17. if (p->context != NULL && (ev & UV_READABLE)) {
  18. redisAsyncHandleRead(p->context);
  19. }
  20. if (p->context != NULL && (ev & UV_WRITABLE)) {
  21. redisAsyncHandleWrite(p->context);
  22. }
  23. }
  24. static void redisLibuvAddRead(void *privdata) {
  25. redisLibuvEvents* p = (redisLibuvEvents*)privdata;
  26. if (p->events & UV_READABLE) {
  27. return;
  28. }
  29. p->events |= UV_READABLE;
  30. uv_poll_start(&p->handle, p->events, redisLibuvPoll);
  31. }
  32. static void redisLibuvDelRead(void *privdata) {
  33. redisLibuvEvents* p = (redisLibuvEvents*)privdata;
  34. p->events &= ~UV_READABLE;
  35. if (p->events) {
  36. uv_poll_start(&p->handle, p->events, redisLibuvPoll);
  37. } else {
  38. uv_poll_stop(&p->handle);
  39. }
  40. }
  41. static void redisLibuvAddWrite(void *privdata) {
  42. redisLibuvEvents* p = (redisLibuvEvents*)privdata;
  43. if (p->events & UV_WRITABLE) {
  44. return;
  45. }
  46. p->events |= UV_WRITABLE;
  47. uv_poll_start(&p->handle, p->events, redisLibuvPoll);
  48. }
  49. static void redisLibuvDelWrite(void *privdata) {
  50. redisLibuvEvents* p = (redisLibuvEvents*)privdata;
  51. p->events &= ~UV_WRITABLE;
  52. if (p->events) {
  53. uv_poll_start(&p->handle, p->events, redisLibuvPoll);
  54. } else {
  55. uv_poll_stop(&p->handle);
  56. }
  57. }
  58. static void on_timer_close(uv_handle_t *handle) {
  59. redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
  60. p->timer.data = NULL;
  61. if (!p->handle.data) {
  62. // both timer and handle are closed
  63. hi_free(p);
  64. }
  65. // else, wait for `on_handle_close`
  66. }
  67. static void on_handle_close(uv_handle_t *handle) {
  68. redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
  69. p->handle.data = NULL;
  70. if (!p->timer.data) {
  71. // timer never started, or timer already destroyed
  72. hi_free(p);
  73. }
  74. // else, wait for `on_timer_close`
  75. }
  76. // libuv removed `status` parameter since v0.11.23
  77. // see: https://github.com/libuv/libuv/blob/v0.11.23/include/uv.h
  78. #if (UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR < 11) || \
  79. (UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR == 11 && UV_VERSION_PATCH < 23)
  80. static void redisLibuvTimeout(uv_timer_t *timer, int status) {
  81. (void)status; // unused
  82. #else
  83. static void redisLibuvTimeout(uv_timer_t *timer) {
  84. #endif
  85. redisLibuvEvents *e = (redisLibuvEvents*)timer->data;
  86. redisAsyncHandleTimeout(e->context);
  87. }
  88. static void redisLibuvSetTimeout(void *privdata, struct timeval tv) {
  89. redisLibuvEvents* p = (redisLibuvEvents*)privdata;
  90. uint64_t millsec = tv.tv_sec * 1000 + tv.tv_usec / 1000.0;
  91. if (!p->timer.data) {
  92. // timer is uninitialized
  93. if (uv_timer_init(p->handle.loop, &p->timer) != 0) {
  94. return;
  95. }
  96. p->timer.data = p;
  97. }
  98. // updates the timeout if the timer has already started
  99. // or start the timer
  100. uv_timer_start(&p->timer, redisLibuvTimeout, millsec, 0);
  101. }
  102. static void redisLibuvCleanup(void *privdata) {
  103. redisLibuvEvents* p = (redisLibuvEvents*)privdata;
  104. p->context = NULL; // indicate that context might no longer exist
  105. if (p->timer.data) {
  106. uv_close((uv_handle_t*)&p->timer, on_timer_close);
  107. }
  108. uv_close((uv_handle_t*)&p->handle, on_handle_close);
  109. }
  110. static int redisLibuvAttach(redisAsyncContext* ac, uv_loop_t* loop) {
  111. redisContext *c = &(ac->c);
  112. if (ac->ev.data != NULL) {
  113. return REDIS_ERR;
  114. }
  115. ac->ev.addRead = redisLibuvAddRead;
  116. ac->ev.delRead = redisLibuvDelRead;
  117. ac->ev.addWrite = redisLibuvAddWrite;
  118. ac->ev.delWrite = redisLibuvDelWrite;
  119. ac->ev.cleanup = redisLibuvCleanup;
  120. ac->ev.scheduleTimer = redisLibuvSetTimeout;
  121. redisLibuvEvents* p = (redisLibuvEvents*)hi_malloc(sizeof(*p));
  122. if (p == NULL)
  123. return REDIS_ERR;
  124. memset(p, 0, sizeof(*p));
  125. if (uv_poll_init_socket(loop, &p->handle, c->fd) != 0) {
  126. return REDIS_ERR;
  127. }
  128. ac->ev.data = p;
  129. p->handle.data = p;
  130. p->context = ac;
  131. return REDIS_OK;
  132. }
  133. #endif