背景是需要从redis订阅(subscribe)消息,并且希望可以随时终止该线程并正常释放资源,因此选择hiredis的异步(Asynchronous )模式(个人更喜欢称其为非阻塞模式,因为通过event loop主动检查并触发事件的方式个人认为应该看作为同步模式),并且选择libev做event loop,这里贴一下整理的使用代码。

参考:
https://github.com/redis/hiredis
http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod

需求

  • 循环线程可以随时终止
    使用一个管道(pipe),读端加入libev event loop的可读监控,需要退出时在写端写入数据,这样event loop就可以随时中断了。
  • 循环线程终止后正常释放资源
    event loop中断后顺其自然可以释放资源。
  • 对于连接断开要做出重连
    hiredis设置redisAsyncSetDisconnectCallback的断开回调函数中中断event loop,退出循环后检查需要重连还是退出线程。这里需要注意的一点是,hiredis的异步上下文redisAsyncContext会在disconnect的回调后被库主动释放,因此选择在disconnect时将保存的上下文指针置NULL,以检查并防止代码中重复释放操作。
  • 对于连接无响应要有健康检查
    通过libev加入定时器timer来支持健康检查。由于hiredis的一个异步上下文执行过subscribe后不再支持需要的ping命令,因此会额外准备一个只用来健康检查的异步上下文。
  • 重连尝试失败后要等待一段时间后再重试
    同样通过定时器完成。
  • 异常事件要有ERROR等级日志

连接断开的场景可以使用重启redis模拟。

连接无响应的场景可以使用iptables命令sudo iptables -A INPUT -p tcp --dport 6379 -j DROP模拟。

代码

编译命令:
gcc main.c sb_redis.c -Wall -g -lhiredis -lev -lpthread

代码如下:

sb_redis.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#ifndef __SB_REDIS_H
#define __SB_REDIS_H

typedef int(data_recv_func)(char *buf, int len);

struct sb_redis;

extern struct sb_redis *sb_redis_new(const char *server, int port, const char *pwd,
int timeout, const char *channel, data_recv_func *func);

extern int sb_redis_create_subscribe_loop(struct sb_redis *sb);

extern void sb_redis_destroy_subscribe_loop(struct sb_redis *sb);

extern void sb_redis_cleanup(struct sb_redis *sb);

#endif
sb_redis.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <stddef.h>
#include <pthread.h>
#include <signal.h>

#include <hiredis/adapters/libev.h>

#include "sb_redis.h"


#define LOG_ERROR(fmt, ...) _log("ERROR", __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define LOG_INFO(fmt, ...) _log("INFO ", __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define LOG_DEBUG(fmt, ...) _log("DEBUG", __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)

#define container_of(ptr, type, member) ({ \
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - offsetof(type,member) );})

/*
* nonblock subscribe redis context
* */
struct sb_redis {
char *server;
int port;
int timeout;
char *pwd;
char *channel;
data_recv_func *func;
pthread_t thread;
struct redisAsyncContext *ac;
struct redisAsyncContext *ac_ping;
struct ev_loop *loop;
struct ev_timer timeout_timer;
struct ev_io notify_watcher;
int notify[2];
int ok;
volatile int stop;
};

void _log(const char *level, const char *file, int line, const char *func, const char *fmt, ...) {
char buf[2048];
va_list list;
time_t t;
struct tm t_tm = {};
if (time(&t) != (time_t)-1) {
localtime_r(&t, &t_tm);
}

time(&t);

va_start(list, fmt);

vsnprintf(buf, sizeof(buf), fmt, list);

printf("%4d-%02d-%02d,%02d:%02d:%02d tid:%5lu [%s %s:%d] %s", t_tm.tm_year + 1900,
t_tm.tm_mon + 1, t_tm.tm_mday, t_tm.tm_hour, t_tm.tm_min,
t_tm.tm_sec, pthread_self() % 100000, level, file, line, buf);
}

static void redis_subscribe_callback(redisAsyncContext *ac, void *r, void *privdata) {
struct sb_redis *sb = ac->data;
data_recv_func *func = privdata;
redisReply *reply = (redisReply *)r;
if (reply == NULL) {
LOG_DEBUG("subscribe reply NULL\n");
} else if (reply->type == REDIS_REPLY_ERROR) {
LOG_ERROR("subscribe failed, %s\n", reply->str);
ev_break(sb->loop, EVBREAK_ONE);
} else if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 3){
// 首个响应消息类型为INTEGER不是需要的
if (reply->element[2]->type == REDIS_REPLY_STRING) {
//LOG("subscribe msg str len %d, %s\n", reply->element[2]->len, reply->element[2]->str);
func(reply->element[2]->str, reply->element[2]->len);
}
} else {
LOG_ERROR("subscribe msg invalid\n");
}
}


static void timeout_callback(struct ev_loop *loop, ev_timer *w, int revents) {
struct sb_redis *sb = container_of(w, struct sb_redis, timeout_timer);
sb->ok = -1;
ev_break(loop, EVBREAK_ONE);
LOG_DEBUG("timeout\n");
}



static void live_detect_start(struct ev_loop *loop, ev_timer *w, int revents);
// ping响应回调
static void redis_ping_callback(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = (redisReply *)r;
struct sb_redis *sb = ac->data;

if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
if (reply == NULL)
LOG_ERROR("live detect failed\n");
else
LOG_ERROR("live detect failed, %s\n", reply->str);
ev_break(sb->loop, EVBREAK_ONE);
} else {
LOG_DEBUG("live detect ok\n");
// 启动下一次健康检查
ev_timer_stop(sb->loop, &sb->timeout_timer);
ev_timer_init(&sb->timeout_timer, live_detect_start, 5, 0);
ev_timer_start(sb->loop, &sb->timeout_timer);
}
}

// 定时健康检查的激活函数
static void live_detect_start(struct ev_loop *loop, ev_timer *w, int revents) {
struct sb_redis *sb = container_of(w, struct sb_redis, timeout_timer);

if (redisAsyncCommand(sb->ac_ping, redis_ping_callback, NULL, "PING") != REDIS_OK) {
LOG_ERROR("redisAsyncCommand PING failed\n");
ev_break(loop, EVBREAK_ONE);
}

LOG_DEBUG("live detect start\n");
// 将定时器用于ping响应的超时
ev_timer_stop(loop, w);
ev_timer_init(w, timeout_callback, sb->timeout, 0);
ev_timer_start(loop, w);
}

static void redis_connect_callback(const redisAsyncContext *ac, int status) {

struct sb_redis *sb = ac->data;
if (status == REDIS_OK) {
LOG_INFO("async connect ok\n");
sb->ok = 1;
} else {
LOG_ERROR("async connect failed, %s\n", ac->errstr);
if (sb->ac == ac) {
sb->ac = NULL;
}
if (sb->ac_ping == ac) {
sb->ac_ping = NULL;
}
sb->ok = 0;
}
ev_break(sb->loop, EVBREAK_ONE);
}

static void redis_auth_callback(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = (redisReply *)r;
struct sb_redis *sb = ac->data;

if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
if (reply == NULL)
LOG_ERROR("async auth failed\n");
else
LOG_ERROR("async auth failed, %s\n", reply->str);
sb->ok = 0;
} else {
LOG_INFO("async auth ok\n");
sb->ok = 1;
}
ev_break(sb->loop, EVBREAK_ONE);
}

static void redis_disconnect_callback(const redisAsyncContext *ac, int status) {
struct sb_redis *sb = ac->data;
if (status == REDIS_OK) {
LOG_INFO("async disconnect ok\n");
} else {
LOG_ERROR("async disconnect : %s\n", ac->errstr);
}

if (sb->ac == ac)
sb->ac = NULL;
if (sb->ac_ping == ac)
sb->ac_ping = NULL;
ev_break(sb->loop, EVBREAK_ONE);
}

static int init_context(struct sb_redis *sb) {
sb->ac = redisAsyncConnect(sb->server, sb->port);

if (sb->ac == NULL) {
LOG_ERROR("redisAsyncConnect failed\n");
goto cleanup;
}
if (sb->ac->err) {
LOG_ERROR("redisAsyncConnect failed, %s\n", sb->ac->errstr);
goto cleanup;
}

sb->ac->data = sb;

if (redisLibevAttach(sb->loop, sb->ac) != REDIS_OK) {
LOG_ERROR("redisLibevAttach failed\n");
goto cleanup;
}

if (redisAsyncSetConnectCallback(sb->ac, redis_connect_callback) != REDIS_OK) {
LOG_ERROR("redisAsyncSetConnectCallback failed\n");
goto cleanup;
}
if (redisAsyncSetDisconnectCallback(sb->ac, redis_disconnect_callback) != REDIS_OK) {
LOG_ERROR("redisAsyncSetDisconnectCallback failed\n");
goto cleanup;
}

// connect
LOG_INFO("async connect start\n");
sb->ok = 0;
if (sb->timeout > 0) {
ev_now_update(sb->loop);
ev_timer_init(&sb->timeout_timer, timeout_callback, sb->timeout, 0);
ev_timer_start(sb->loop, &sb->timeout_timer);
}
ev_run(sb->loop, 0);
if (sb->timeout > 0) {
ev_timer_stop(sb->loop, &sb->timeout_timer);
}
if (sb->ok > 0) {
/* 连接完成 */
} else {
if (sb->ok < 0) {
/* timer timeout */
} else {
/* 连接失败 */
}
goto cleanup;
}

// auth
if (sb->pwd) {
LOG_INFO("async auth start\n");
if (redisAsyncCommand(sb->ac, redis_auth_callback, NULL, "AUTH %s", sb->pwd) != REDIS_OK) {
LOG_ERROR("redisAsyncCommand AUTH failed\n");
goto cleanup;
}
sb->ok = 0;
if (sb->timeout > 0) {
ev_now_update(sb->loop);
ev_timer_set(&sb->timeout_timer, sb->timeout, 0);
ev_timer_start(sb->loop, &sb->timeout_timer);
}
ev_run(sb->loop, 0);
if (sb->timeout > 0) {
ev_timer_stop(sb->loop, &sb->timeout_timer);
}
if (sb->ok > 0) {
/* auth完成 */
} else {
if (sb->ok < 0) {
/* timer timeout */
} else {
/* auth失败 */
}
goto cleanup;
}
}

// ping context
sb->ac_ping = redisAsyncConnect(sb->server, sb->port);

if (sb->ac_ping == NULL) {
LOG_ERROR("redisAsyncConnect failed\n");
goto cleanup;
}
if (sb->ac_ping->err) {
LOG_ERROR("redisAsyncConnect failed, %s\n", sb->ac->errstr);
goto cleanup;
}

sb->ac_ping->data = sb;

if (redisLibevAttach(sb->loop, sb->ac_ping) != REDIS_OK) {
LOG_ERROR("redisLibevAttach failed\n");
goto cleanup;
}

if (redisAsyncSetConnectCallback(sb->ac_ping, redis_connect_callback) != REDIS_OK) {
LOG_ERROR("redisAsyncSetConnectCallback failed\n");
goto cleanup;
}
if (redisAsyncSetDisconnectCallback(sb->ac_ping, redis_disconnect_callback) != REDIS_OK) {
LOG_ERROR("redisAsyncSetDisconnectCallback failed\n");
goto cleanup;
}

// connect
LOG_INFO("async connect start\n");
sb->ok = 0;
if (sb->timeout > 0) {
ev_now_update(sb->loop);
ev_timer_set(&sb->timeout_timer, sb->timeout, 0);
ev_timer_start(sb->loop, &sb->timeout_timer);
}
ev_run(sb->loop, 0);
if (sb->timeout > 0) {
ev_timer_stop(sb->loop, &sb->timeout_timer);
}
if (sb->ok > 0) {
/* 连接完成 */
} else {
if (sb->ok < 0) {
/* timer timeout */
} else {
/* 连接失败 */
}
goto cleanup;
}

// auth
if (sb->pwd) {
LOG_INFO("async auth start\n");
if (redisAsyncCommand(sb->ac_ping, redis_auth_callback, NULL, "AUTH %s", sb->pwd) != REDIS_OK) {
LOG_ERROR("redisAsyncCommand AUTH failed\n");
goto cleanup;
}
sb->ok = 0;
if (sb->timeout > 0) {
ev_now_update(sb->loop);
ev_timer_set(&sb->timeout_timer, sb->timeout, 0);
ev_timer_start(sb->loop, &sb->timeout_timer);
}
ev_run(sb->loop, 0);
if (sb->timeout > 0) {
ev_timer_stop(sb->loop, &sb->timeout_timer);
}
if (sb->ok > 0) {
/* auth完成 */
} else {
if (sb->ok < 0) {
/* timer timeout */
} else {
/* auth失败 */
}
goto cleanup;
}
}

return 0;

cleanup:
if (sb->ac) {
redisAsyncFree(sb->ac);
sb->ac = NULL;
}
if (sb->ac_ping) {
redisAsyncFree(sb->ac_ping);
sb->ac_ping = NULL;
}
return -1;
}


static void notify_callback(struct ev_loop *loop, struct ev_io *w, int revents) {
char buf[32];
int n = read(w->fd, buf, sizeof(buf));
if (n > 0) {
ev_break(loop, EVBREAK_ONE);
}

LOG_DEBUG("notify read %d\n", n);
}

static void *thread_loop(void *arg) {
struct sb_redis *sb = arg;
int r;
ev_now_update(sb->loop);
ev_io_init(&sb->notify_watcher, notify_callback, sb->notify[0], EV_READ);
ev_io_start(sb->loop, &sb->notify_watcher);

begin:

LOG_INFO("async redis subscribe loop start\n");
// 启动健康检查
ev_timer_init(&sb->timeout_timer, live_detect_start, 5, 0);
ev_timer_start(sb->loop, &sb->timeout_timer);

ev_run(sb->loop, 0);

// 无论什么原因退出了,都关掉定时器
ev_timer_stop(sb->loop, &sb->timeout_timer);
LOG_INFO("async redis subscribe loop stop\n");

if (sb->stop)
return NULL;

reset:
// 到这里说明出问题了
if (sb->ac)
redisAsyncFree(sb->ac);
if (sb->ac_ping)
redisAsyncFree(sb->ac_ping);

if (sb->stop)
return NULL;

LOG_INFO("reset starting\n");

r = init_context(sb);

if (sb->stop)
return NULL;

if (r != 0) {
// 重连失败,进入休息
LOG_ERROR("reset failed\n");
} else {
// 重连成功,重新subscribe
LOG_INFO("reset ok\n");
if (redisAsyncCommand(sb->ac, redis_subscribe_callback, sb->func, "SUBSCRIBE %s", sb->channel) != REDIS_OK) {
// 失败,进入休息
LOG_ERROR("subscribe failed\n");
} else {
goto begin;
}
}

// 休息10秒再试
LOG_ERROR("bad happened, sleep 10 seconds\n");

ev_timer_set(&sb->timeout_timer, 10, 0);
ev_timer_init(&sb->timeout_timer, timeout_callback, 10, 0);
ev_timer_start(sb->loop, &sb->timeout_timer);
ev_run(sb->loop, 0);

goto reset;
}

struct sb_redis *sb_redis_new(const char *server, int port, const char *pwd, int timeout, const char *channel, data_recv_func *func) {

struct sb_redis *sb = NULL;

signal(SIGPIPE, SIG_IGN);

sb = (typeof(sb))malloc(sizeof(*sb));
if (sb == NULL) {
LOG_ERROR("out of memory\n");
return NULL;
}
memset(sb, '\0', sizeof(*sb));
sb->func = func;
sb->channel = strdup(channel);
sb->server = strdup(server);
if (pwd)
sb->pwd = strdup(pwd);
sb->timeout = timeout;
sb->port = port;

sb->notify[0] = -1;
if (pipe(sb->notify)) {
LOG_ERROR("pipe failed, %s\n", strerror(errno));
goto cleanup;
}

sb->loop = ev_loop_new(EVFLAG_AUTO);
if (sb->loop == NULL) {
LOG_ERROR("ev_loop_new failed\n");
goto cleanup;
}

if (init_context(sb)) {
goto cleanup;
}

return sb;

cleanup:
if (sb)
sb_redis_cleanup(sb);

return NULL;
}


int sb_redis_create_subscribe_loop(struct sb_redis *sb) {
if (redisAsyncCommand(sb->ac, redis_subscribe_callback, sb->func, "SUBSCRIBE %s", sb->channel) != REDIS_OK) {
return -1;
}

if (pthread_create(&sb->thread, NULL, thread_loop, sb) != 0) {
LOG_ERROR("pthread_create failed\n");
return -1;
}
return 0;
}

void sb_redis_destroy_subscribe_loop(struct sb_redis *sb) {
sb->stop = 1;
int n = write(sb->notify[1], "e", 1);
if (n > 0) {
LOG_DEBUG("notify write %d\n", n);
} else {
LOG_ERROR("notify write %d, %s\n", n, strerror(errno));
}
pthread_join(sb->thread, NULL);
}

void sb_redis_cleanup(struct sb_redis *sb) {
if (sb->ac) {
redisAsyncFree(sb->ac);
}
if (sb->ac_ping) {
redisAsyncFree(sb->ac_ping);
}
if (sb->loop != NULL) {
ev_loop_destroy(sb->loop);
}
if (sb->notify[0] != -1) {
close(sb->notify[0]);
close(sb->notify[1]);
}
if (sb->pwd)
free(sb->pwd);
free(sb->server);
free(sb->channel);
free(sb);
}
main.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>

#include "sb_redis.h"

#define TEST_SERVER "192.168.1.1"
#define TEST_PORT 6379
#define TEST_PWD "xxxxxx"
#define TEST_CHANNEL "test"

#define TIMEOUT 10

static volatile int _stop = 0;

static void term_handler(int sig) {
_stop = 1;
}

static int recv_func(char *data, int len) {
printf("recv_func len: %d, str: %s\n", len, data);
return 0;
}

int main() {

signal(SIGINT, term_handler);

struct sb_redis *sb;

sb = sb_redis_new(TEST_SERVER, TEST_PORT, TEST_PWD, 10, TEST_CHANNEL, recv_func);
if (!sb) {
printf("sb_redis_new failed\n");
return -1;
}

if (sb_redis_create_subscribe_loop(sb) != 0) {
printf("sb_redis_create_subscribe_loop failed\n");
sb_redis_cleanup(sb);
return -1;
}

while (!_stop) {
sleep(1);
}

_stop = 0;

sb_redis_destroy_subscribe_loop(sb);

sb_redis_cleanup(sb);

return 0;
}