-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathaof.c
executable file
·1858 lines (1674 loc) · 78.9 KB
/
aof.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "redis.h"
#include "bio.h"
#include "rio.h"
#include <signal.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
void aofUpdateCurrentSize(void);
void aofClosePipes(void);
/* ----------------------------------------------------------------------------
* AOF rewrite buffer implementation.
* AOF重写缓存的实现
*
* The following code implement a simple buffer used in order to accumulate
* changes while the background process is rewriting the AOF file.
* 下面的代码实现了一个简单的缓存功能,用来在后台执行AOF文件重写(即BGREWRITEAOF命令)时
* 积攒所有修改数据库的操作。
*
* We only need to append, but can't just use realloc with a large block
* because 'huge' reallocs are not always handled as one could expect
* (via remapping of pages at OS level) but may involve copying data.
* 对于这个缓存,我们只需要append操作,但是我们无法分配一个非常大的空间。
* 因为并不总是能成功分配一个非常大的空间
*
* For this reason we use a list of blocks, every block is
* AOF_RW_BUF_BLOCK_SIZE bytes.
* 因此我们使用多个大小为AOF_RW_BUF_BLOCK_SIZE字节的空间来实现缓存功能。
*
* !!!!!!!!!这里约定:把每个大小为AOF_RW_BUF_BLOCK_SIZE字节的空间称作一个“缓冲区” !!!!!!!!!!!!!
* ------------------------------------------------------------------------- */
/* 定义每个缓冲区的大小为10M */
#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */
/* AOF重写缓存结构体 */
typedef struct aofrwblock {
// 缓冲区中已经使用的字节数和可用字节数
unsigned long used, free;
// 缓冲区
char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock;
/* This function free the old AOF rewrite buffer if needed, and initialize
* a fresh new one. It tests for server.aof_rewrite_buf_blocks equal to NULL
* so can be used for the first initialization as well. */
/* 这个函数用来释放旧的AOF重写缓存并初始化一个新的AOF重写缓存。
这个函数会检测server.aof_rewrite_buf_blocks是否为NULL,因此也可以用于AOF重写缓存的初始化。*/
void aofRewriteBufferReset(void) {
// 释放旧的AOF重写缓存,实际就是释放链表
if (server.aof_rewrite_buf_blocks)
listRelease(server.aof_rewrite_buf_blocks);
// 初始化新的AOF缓存,实际就是创建一个链表
server.aof_rewrite_buf_blocks = listCreate();
listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
}
/* Return the current size of the AOF rewrite buffer. */
/* 返回AOF重写缓存的当前已使用的空间字节数 */
unsigned long aofRewriteBufferSize(void) {
listNode *ln;
listIter li;
unsigned long size = 0;
// 初始化链表迭代器
listRewind(server.aof_rewrite_buf_blocks,&li);
while((ln = listNext(&li))) {
// 获取描述当前缓冲区的aofrwblock结构体
aofrwblock *block = listNodeValue(ln);
// 更新已使用空间大小
size += block->used;
}
return size;
}
/* Event handler used to send data to the child process doing the AOF
* rewrite. We send pieces of our AOF differences buffer so that the final
* write when the child finishes the rewrite will be small. */
/* 事件处理器,用于将AOF重写缓存中的数据发送给AOF重写子进程。*/
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
listNode *ln;
aofrwblock *block;
ssize_t nwritten;
REDIS_NOTUSED(el);
REDIS_NOTUSED(fd);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
while(1) {
ln = listFirst(server.aof_rewrite_buf_blocks);
block = ln ? ln->value : NULL;
if (server.aof_stop_sending_diff || !block) {
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
AE_WRITABLE);
return;
}
if (block->used > 0) {
nwritten = write(server.aof_pipe_write_data_to_child,
block->buf,block->used);
if (nwritten <= 0) return;
memmove(block->buf,block->buf+nwritten,block->used-nwritten);
block->used -= nwritten;
}
if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
}
}
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
/* 将s中的数据追加到AOF重写缓存中,如果有需要的话就分配一个新的缓冲区。*/
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
// 找到链表中的最后一个缓冲区
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
// 取出该节点的值,即aofrwblock结构
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
/* If we already got at least an allocated block, try appending
* at least some piece into it. */
// 如果至少有一个缓冲区,我们就尝试将数据追加到该缓冲区中
if (block) {
unsigned long thislen = (block->free < len) ? block->free : len;
// 如果当前缓冲区还有剩余空间能够容纳s中的数据,则不需要分配新的缓冲区
if (thislen) { /* The current block is not already full. */
memcpy(block->buf+block->used, s, thislen);
block->used += thislen;
block->free -= thislen;
s += thislen;
len -= thislen;
}
}
// 链表为空(不存在缓冲区) 或 s中还有剩余数据,这两种情况下需要分配新的缓冲区
if (len) { /* First block to allocate, or need another block. */
int numblocks;
// 分配一个新的缓冲区
block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
// 将该缓冲区追加到链表尾部
listAddNodeTail(server.aof_rewrite_buf_blocks,block);
/* Log every time we cross more 10 or 100 blocks, respectively
* as a notice or warning. */
// 每创建10个或100个缓冲区,就打印一个“提醒”或“警告”日志
numblocks = listLength(server.aof_rewrite_buf_blocks);
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING :
REDIS_NOTICE;
redisLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
}
/* Install a file event to send data to the rewrite child if there is
* not one already. */
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}
/* Write the buffer (possibly composed of multiple blocks) into the specified
* fd. If a short write or any other error happens -1 is returned,
* otherwise the number of bytes written is returned. */
/* 将AOF缓存中的内容(可能由多个缓冲区组成)写入指定的fd。
如果发生错误返回-1,否则返回写入的字节数。*/
ssize_t aofRewriteBufferWrite(int fd) {
listNode *ln;
listIter li;
ssize_t count = 0;
// 通过迭代器遍历AOF重写缓存中的所有缓冲区
listRewind(server.aof_rewrite_buf_blocks,&li);
while((ln = listNext(&li))) {
// 得到当前缓冲区
aofrwblock *block = listNodeValue(ln);
ssize_t nwritten;
if (block->used) {
// 如果当前缓冲区中有数据,则将所有数据写入fd中
nwritten = write(fd,block->buf,block->used);
// 发生错误,返回-1
if (nwritten != (ssize_t)block->used) {
if (nwritten == 0) errno = EIO;
return -1;
}
// 记录写入的字节数
count += nwritten;
}
}
return count;
}
/* ----------------------------------------------------------------------------
* AOF file implementation
* AOF文件的实现
* ------------------------------------------------------------------------- */
/* Starts a background task that performs fsync() against the specified
* file descriptor (the one of the AOF file) in another thread. */
/* 在另一个线程中,对参数fd指向AOF文件执行符执行fsync()操作 */
void aof_background_fsync(int fd) {
bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}
/* Called when the user switches from "appendonly yes" to "appendonly no"
* at runtime using the CONFIG command. */
/* 在Redis运行时,如果用户通过CONFIG命令关闭了AOF功能时调用该函数。 */
void stopAppendOnly(void) {
// 只有在Redis起用AOF命令时候才能执行关闭操作,从而触发该函数
redisAssert(server.aof_state != REDIS_AOF_OFF);
// flush操作,将AOF缓存中的内容写入AOF文件中
flushAppendOnlyFile(1);
aof_fsync(server.aof_fd);
// 关闭AOF文件
close(server.aof_fd);
// 重置redisServer实例中与AOF相关的状态
server.aof_fd = -1;
server.aof_selected_db = -1;
server.aof_state = REDIS_AOF_OFF;
/* rewrite operation in progress? kill it, wait child exit */
// 如果正在执行AOF重写操作,则杀死相关子进程并等待其退出
if (server.aof_child_pid != -1) {
int statloc;
redisLog(REDIS_NOTICE,"Killing running AOF rewrite child: %ld",
(long) server.aof_child_pid);
// kill AOF子进程
if (kill(server.aof_child_pid,SIGUSR1) != -1)
// 等待子进程退出
wait3(&statloc,0,NULL);
/* reset the buffer accumulating changes while the child saves */
// 清理缓存和临时文件
aofRewriteBufferReset();
aofRemoveTempFile(server.aof_child_pid);
// 重置相关的状态
server.aof_child_pid = -1;
server.aof_rewrite_time_start = -1;
/* close pipes used for IPC between the two processes. */
// 关闭管道文件
aofClosePipes();
}
}
/* Called when the user switches from "appendonly no" to "appendonly yes"
* at runtime using the CONFIG command. */
/* 在Redis运行时,如果用户通过CONFIG命令开启了AOF功能时调用该函数。 */
int startAppendOnly(void) {
// 将当前时间设为AOF最后一次同步fsync操作时间
server.aof_last_fsync = server.unixtime;
// 打开AOF文件
server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
redisAssert(server.aof_state == REDIS_AOF_OFF);
// 如果打开失败则返回
if (server.aof_fd == -1) {
redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't open the append only file: %s",strerror(errno));
return REDIS_ERR;
}
// 执行AOF重写操作
if (rewriteAppendOnlyFileBackground() == REDIS_ERR) {
close(server.aof_fd);
redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
return REDIS_ERR;
}
/* We correctly switched on AOF, now wait for the rewrite to be complete
* in order to append data on disk. */
// 设置server.aof_state状态。到这里我们成功开启了AOF功能,等待AOF重写完成
server.aof_state = REDIS_AOF_WAIT_REWRITE;
return REDIS_OK;
}
/* Write the append only file buffer on disk.
* 将AOF缓存中的内容写入到磁盘文件中
*
* Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
* 因为Redis要求在回复客户端之前对AOF文件执行写操作,而客户端能执行写操作的唯一途径是进入事件循环之前。
* 所以我们将所有AOF写的内容存放在缓存中,当Redis重新进入事件循环之前调用下面这个函数将缓存中的内容写入
* 文件中。
*
* About the 'force' argument:
* 关于参数force,介绍如下:
*
* When the fsync policy is set to 'everysec' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
* 当fsync同步策略被设置为everysec(每秒保存一次),如果后台线程正在执行fsync()操作
* 我们就延迟执行flush操作,因为Linux上的write(2)会被后台的fsync操作阻塞。
* 当这种情况发生时,我们需要记录“存在一些AOF缓存需要执行flush操作”的信息,Redis将会尝试在serverCron()
* 函数中执行该操作。
*
* However if force is set to 1 we'll write regardless of the background
* fsync.
* 但是,如果参数force被设置为1,则不管后台是否正在执行fsync操作都会直接将AOF缓存写入文件中。
*
* AOF支持三种fsync同步策略:always、everysec、no,默认是everysec。
*/
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
// 缓冲区中没有没有任何内容,直接返回
if (sdslen(server.aof_buf) == 0) return;
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
// 判断后台是否有fsync在执行
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
// 如果fsync同步策略被设置everysec,且不强制写入
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
// 当fsync同步策略被设置everysec时,我们会在后台执行fsync操作
// 如果当前后台仍在执行fsync操作,我们将尝试延迟几秒再执行写操作
// sync_in_progress不为0,说明后台有fsync操作在执行
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponing, remember that we are
* postponing the flush and return. */
// 前面没有推迟的写操作,记录下延迟写操作的时间然后退出
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
// 如果前面已经有写操作因为fsync而被推迟,且推迟的时间不超过2秒,直接返回,不执行剩余操作
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds. */
// 如果后台有fsync操作正在执行,且写操作已经被推迟多于2秒,那么执行写操作(但是该操作会被阻塞)
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */
/* 这里我们要执行单次写操作,如果我们写入的文件系统是物理设备的话需要保证这个操作是原子的。 */
latencyStartMonitor(latency);
// 将server.aof_buf缓冲区中的数据写入文件中
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
/* We want to capture different events for delayed writes:
* when the delay happens with a pending fsync, or with a saving child
* active, and when the above two conditions are missing.
* We also use an additional event name to save all samples which is
* useful for graphing / monitoring purposes. */
if (sync_in_progress) {
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
} else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
latencyAddSampleIfNeeded("aof-write",latency);
/* We performed the write so reset the postponed flush sentinel to zero. */
// 重置延迟写操作的时间记录
server.aof_flush_postponed_start = 0;
// 如果写入的字节数与server.aof_buf缓冲区中数据的字节数不一致,说明写操作出错
if (nwritten != (signed)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
// 限制日志的记录频率为每行AOF_WRITE_LOG_ERROR_RATE秒
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}
/* Log the AOF write error and record the error code. */
// 记录AOF写入错误日志和相关的错误码
if (nwritten == -1) {
if (can_log) {
redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
}
// 如果nwritten不为-1,说明可能写入部分内容(未写完就出错了)
else {
if (can_log) {
redisLog(REDIS_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
// 截断操作,尝试移除之前写入的不完整内容
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
redisLog(REDIS_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftruncate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
// 如果ftruncate()函数调用成功,则将nwritten设置为-1
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
/* Handle the AOF write error. */
// 处理写AOF文件时出现的错误
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synced on disk. */
// 我们无法处理fsync策略为ALWAYS的情况,因为在该策略下客户端回复信息已经在输出缓存中了。
redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = REDIS_ERR;
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
//
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
}
// 下面处理写入成功的情况
else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
// 写入成功,如果当前AOF状态为REDIS_ERR,则更新最后的写入状态并记录日志
if (server.aof_last_write_status == REDIS_ERR) {
redisLog(REDIS_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = REDIS_OK;
}
}
// 更新当前AOF文件大小
server.aof_current_size += nwritten;
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
// 如果AOF缓冲区足够小就重用该缓冲区,如果超过4000bytes则释放该缓冲区
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
// 清空缓冲区中的内容,继续重用该缓冲区
sdsclear(server.aof_buf);
} else {
// 释放该缓冲区
sdsfree(server.aof_buf);
// 新建一个缓冲区
server.aof_buf = sdsempty();
}
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
// 如果Redis的no-appendfsync-on-rewrite选项被开启,且后台有子进程正在执行IO操作,则不执行fsync操作,直接返回
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
/* Perform the fsync if needed. */
// 如果有需要,执行fsync操作
// 当前的fsync策略为AOF_FSYNC_ALWAYS
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_last_fsync = server.unixtime;
}
// 如果当前的fsync策略为AOF_FSYNC_EVERYSEC,且距离上次fsync操作已经过去1秒
else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
// 在后台执行fsync操作
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
}
/* 根据传入命令和该命令的参数将其构造成符合AOF文件格式的字符串形式 */
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
char buf[32];
int len, j;
robj *o;
// 构建格式为“*<count>\r\n"格式的字符串,count为命令参数个数
buf[0] = '*';
len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
buf[len++] = '\r';
buf[len++] = '\n';
dst = sdscatlen(dst,buf,len);
// 重建命令,每个item的格式为“$<len>\r\n<content>\r\n”,其中<len>指明<content>的字符长度,<content>为参数内容
for (j = 0; j < argc; j++) {
o = getDecodedObject(argv[j]);
buf[0] = '$';
len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
buf[len++] = '\r';
buf[len++] = '\n';
dst = sdscatlen(dst,buf,len);
dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
dst = sdscatlen(dst,"\r\n",2);
decrRefCount(o);
}
// 返回重建后的命令内容
return dst;
}
/* Create the sds representation of an PEXPIREAT command, using
* 'seconds' as time to live and 'cmd' to understand what command
* we are translating into a PEXPIREAT.
* 创建PEXPIREAT命令的字符串表示,其中参数seconds表示剩余的生存时间,参数cmd指明原命令的类型。
*
* This command is used in order to translate EXPIRE and PEXPIRE commands
* into PEXPIREAT command so that we retain precision in the append only
* file, and the time is always absolute and not relative.
* 这个函数会将EXPIRE、PEXPIRE命令转换为PEXPIREAT命令,这样就能在确保精确度一直的情况下将过期时间值转换为
* 绝对(时间戳)值而不是相对值。*/
sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {
long long when;
robj *argv[3];
/* Make sure we can use strtoll */
// 从参数seconds中取出过期时间戳并转换为long long类型
seconds = getDecodedObject(seconds);
when = strtoll(seconds->ptr,NULL,10);
/* Convert argument into milliseconds for EXPIRE, SETEX, EXPIREAT */
// 将过期时间统一转换为毫秒
if (cmd->proc == expireCommand || cmd->proc == setexCommand ||
cmd->proc == expireatCommand)
{
when *= 1000;
}
/* Convert into absolute time for EXPIRE, PEXPIRE, SETEX, PSETEX */
// 将过期时间由相对值转换为绝对值
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == setexCommand || cmd->proc == psetexCommand)
{
when += mstime();
}
decrRefCount(seconds);
// 构建PEXPIREAT命令
argv[0] = createStringObject("PEXPIREAT",9);
argv[1] = key;
argv[2] = createStringObjectFromLongLong(when);
// 追加到buf指向的缓存中
buf = catAppendOnlyGenericCommand(buf, 3, argv);
decrRefCount(argv[0]);
decrRefCount(argv[2]);
return buf;
}
/* 将命令还原后追加到AOF缓冲区server.aof_buf中,该缓冲区的内容将会在某个时刻被写入磁盘。
另外,如果后台正在执行AOF文件重写操作,还需要将该命令追加到AOF重写缓存中。 */
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
// 如果当前命令涉及的数据库与server.aof_selected_db指明的数据库不一致,需要加入SELECT命令显式设置
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
// 处理EXPIRE, SETEX, EXPIREAT命令
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
// 将EXPIRE/PEXPIRE/EXPIREAT命令都转换为PEXPIREAT命令
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
}
// 处理SETEX、PSETEX命令
else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
// 将SETEX/PSETEX命令转换为SET命令和PEXPIREAT命令
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
}
// 其它命令使用catAppendOnlyGenericCommand()函数处理
else {
/* All the other commands don't need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
// 所有其它命令并不需要转换操作或者已经完成转换
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
// 将重构后的命令字符串追加到AOF缓冲区中。AOF缓冲区中的数据会在重新进入时间循环前写入磁盘中,相应的客户端
// 也会受到一个关于此次操作的回复消息
if (server.aof_state == REDIS_AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
// 如果后台正在执行AOF文件重写操作(即BGREWRITEAOF命令),为了记录当前正在重写的AOF文件和当前数据库的
// 差异信息,我们还需要将重构后的命令追加到AOF重写缓存中。
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf);
}
/* ----------------------------------------------------------------------------
* AOF loading 加载AOF文件
* ------------------------------------------------------------------------- */
/* In Redis commands are always executed in the context of a client, so in
* order to load the append only file we need to create a fake client. */
/* 在Redis中,命令必须由redisClient实例来执行,所以为了加载AOF文件需要创建一个伪Redis客户端。*/
struct redisClient *createFakeClient(void) {
struct redisClient *c = zmalloc(sizeof(*c));
selectDb(c,0);
c->fd = -1;
c->name = NULL;
c->querybuf = sdsempty();
c->querybuf_peak = 0;
c->argc = 0;
c->argv = NULL;
c->bufpos = 0;
c->flags = 0;
c->btype = REDIS_BLOCKED_NONE;
/* We set the fake client as a slave waiting for the synchronization
* so that Redis will not try to send replies to this client. */
// 将该客户端设置为正在等待同步的从节点,这样Redis就不会向该客户端发送回复了
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
c->watched_keys = listCreate();
c->peerid = NULL;
listSetFreeMethod(c->reply,decrRefCountVoid);
listSetDupMethod(c->reply,dupClientReplyValue);
initClientMultiState(c);
return c;
}
// 释放伪客户端的命令空间,即argv数组
void freeFakeClientArgv(struct redisClient *c) {
int j;
for (j = 0; j < c->argc; j++)
decrRefCount(c->argv[j]);
zfree(c->argv);
}
/* 释放伪Redis客户端。*/
void freeFakeClient(struct redisClient *c) {
// 释放查询缓存
sdsfree(c->querybuf);
// 释放回复缓存
listRelease(c->reply);
// 释放被监视的key列表
listRelease(c->watched_keys);
// 释放事务状态
freeClientMultiState(c);
// 释放客户端实例
zfree(c);
}
/* Replay the append log file. On success REDIS_OK is returned. On non fatal
* error (the append only file is zero-length) REDIS_ERR is returned. On
* fatal error an error message is logged and the program exists. */
/* 执行AOF文件中的命令。如果操作成功则返回REDIS_OK。
如果出现一些非致命性错误,比如AOF文件长度为0,则返回REDIS_ERR。
如果出现致命性错误,则记录错误信息并且退出程序。*/
int loadAppendOnlyFile(char *filename) {
// 伪客户端
struct redisClient *fakeClient;
// 打开AOF文件
FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
off_t valid_up_to = 0; /* Offset of the latest well-formed command loaded. */
// 检查AOF文件
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
return REDIS_ERR;
}
// 检查文件句柄,即判断AOF文件是否打开
if (fp == NULL) {
redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
exit(1);
}
/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
* to the same file we're about to read. */
// 暂时关闭AOF功能,防止在执行MULTI命令时,EXEC命令被传播到当前被打开的AOF文件中
server.aof_state = REDIS_AOF_OFF;
// 创建伪客户端
fakeClient = createFakeClient();
// 设置相关的全局状态位,startLoading函数定义在rdb.c中
startLoading(fp);
while(1) {
int argc, j;
unsigned long len;
robj **argv;
char buf[128];
sds argsds;
struct redisCommand *cmd;
/* Serve the clients from time to time */
// 间隔性处理客户端请求
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
processEventsWhileBlocked();
}
// 将文件中的内容读出到buf缓冲区中,fgets只读取一行内容
if (fgets(buf,sizeof(buf),fp) == NULL) {
// 如果文件内容已经读取完毕,跳出while循环
if (feof(fp))
break;
else
goto readerr;
}
// 检查命令格式
if (buf[0] != '*') goto fmterr;
if (buf[1] == '\0') goto readerr;
// 取出命令参数个数
argc = atoi(buf+1);
// 检查命令参数个数是否合法
if (argc < 1) goto fmterr;
// 分配空间用来存放读取出来的命令
argv = zmalloc(sizeof(robj*)*argc);
fakeClient->argc = argc;
fakeClient->argv = argv;
// 从AOF文件中解析出命令及相关的参数
for (j = 0; j < argc; j++) {
// 每次读取一行
if (fgets(buf,sizeof(buf),fp) == NULL) {
fakeClient->argc = j; /* Free up to j-1. */
freeFakeClientArgv(fakeClient);
goto readerr;
}
if (buf[0] != '$') goto fmterr;
// 读取参数值的长度信息
len = strtol(buf+1,NULL,10);
// 读取对应的参数值
argsds = sdsnewlen(NULL,len);
if (len && fread(argsds,len,1,fp) == 0) {
sdsfree(argsds);
fakeClient->argc = j; /* Free up to j-1. */
freeFakeClientArgv(fakeClient);
goto readerr;
}
// 根据上面读出来的参数创建字符串对象
argv[j] = createObject(REDIS_STRING,argsds);
if (fread(buf,2,1,fp) == 0) {
fakeClient->argc = j+1; /* Free up to j. */
freeFakeClientArgv(fakeClient);
goto readerr; /* discard CRLF */
}
}
/* Command lookup */
// 在命令表中查找命令,也就是判断是否存在该命令
cmd = lookupCommand(argv[0]->ptr);
if (!cmd) {
redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr);
exit(1);
}
/* Run the command in the context of a fake client */
// 让伪客户端执行命令
cmd->proc(fakeClient);
/* The fake client should not have a reply */
// 伪客户端不能有回复
redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
/* The fake client should never get blocked */
// 伪客户端不能被blocked(阻塞)
redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
/* Clean up. Command code may have changed argv/argc so we use the
* argv/argc of the client instead of the local variables. */
// 清除操作,释放客户端的argv数组
freeFakeClientArgv(fakeClient);
if (server.aof_load_truncated) valid_up_to = ftello(fp);
}
/* This point can only be reached when EOF is reached without errors.
* If the client is in the middle of a MULTI/EXEC, log error and quit. */
// 如果能够执行到这里,说明AOF文件的内容已经全部被正确处理
// 接下来检查客户端是不是处于MULTI事务中
if (fakeClient->flags & REDIS_MULTI) goto uxeof;
loaded_ok: /* DB loaded, cleanup and return REDIS_OK to the caller. */
// 到这里,已经将AOF文件中的命令全部执行完毕,返回REDIS_OK
// 关闭AOF文件
fclose(fp);
// 释放伪客户端
freeFakeClient(fakeClient);
// 复位
server.aof_state = old_aof_state;
stopLoading();
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
return REDIS_OK;
readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */
// 文件内容出错
if (!feof(fp)) {
redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
exit(1);
}
uxeof: /* Unexpected AOF end of file. */
// 非预期的文件末尾
if (server.aof_load_truncated) {
redisLog(REDIS_WARNING,"!!! Warning: short read while loading the AOF file !!!");
redisLog(REDIS_WARNING,"!!! Truncating the AOF at offset %llu !!!",
(unsigned long long) valid_up_to);
if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) {
if (valid_up_to == -1) {
redisLog(REDIS_WARNING,"Last valid command offset is invalid");
} else {
redisLog(REDIS_WARNING,"Error truncating the AOF file: %s",
strerror(errno));
}
} else {
/* Make sure the AOF file descriptor points to the end of the
* file after the truncate call. */
if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) {
redisLog(REDIS_WARNING,"Can't seek the end of the AOF file: %s",
strerror(errno));
} else {
redisLog(REDIS_WARNING,
"AOF loaded anyway because aof-load-truncated is enabled");
goto loaded_ok;
}
}
}
redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file. You can: 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.");
exit(1);
fmterr: /* Format error. */
// 文件内容格式错误
redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
exit(1);
}
/* ----------------------------------------------------------------------------
* AOF rewrite AOF重写
* ------------------------------------------------------------------------- */
/* 下面这些函数定义在rio.h文件中
size_t rioWriteBulkCount(rio *r, char prefix, int count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
size_t rioWriteBulkLongLong(rio *r, long long l);
size_t rioWriteBulkDouble(rio *r, double d);
*/
/* Delegate writing an object to writing a bulk string or bulk long long.
* This is not placed in rio.c since that adds the redis.h dependency. */
/* 将参数obj中的字符串或long long类型整数值写入rio对象中。 */
int rioWriteBulkObject(rio *r, robj *obj) {
/* Avoid using getDecodedObject to help copy-on-write (we are often
* in a child process when this function is called). */
if (obj->encoding == REDIS_ENCODING_INT) {
return rioWriteBulkLongLong(r,(long)obj->ptr);
} else if (sdsEncodedObject(obj)) {
return rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr));
} else {
redisPanic("Unknown string encoding");
}
}
/* Emit the commands needed to rebuild a list object.
* The function returns 0 on error, 1 on success. */
/* 将重建列表list对象需要的命令(即RPUSH命令)写入rio对象中。
该函数出错返回0,成功返回1。*/
int rewriteListObject(rio *r, robj *key, robj *o) {
long long count = 0, items = listTypeLength(o);
// 处理ziplist编码的list对象
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = o->ptr;
unsigned char *p = ziplistIndex(zl,0);
unsigned char *vstr;
unsigned int vlen;
long long vlong;
// 在AOF文件中,每条RPUSH命令只能添加REDIS_AOF_REWRITE_ITEMS_PER_CMD个元素
// 这里遍历ziplist,将每REDIS_AOF_REWRITE_ITEMS_PER_CMD个元素组装到一条RPUSH命令中去
// 想想为什么要这么做?如果list对象中存在大量的元素,将它们放到一条RPUSH命令中会如何
while(ziplistGet(p,&vstr,&vlen,&vlong)) {
if (count == 0) {
int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
}
// 取出元素值并写入rio对象中
if (vstr) {
if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0;
} else {
if (rioWriteBulkLongLong(r,vlong) == 0) return 0;
}
// 移动迭代器,除以下一个元素
p = ziplistNext(zl,p);
// 取出元素个数加1,如果取出元素个数等于REDIS_AOF_REWRITE_ITEMS_PER_CMD规定的数量
// 则剩余元素放到另一条RPUSH命令中
if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
}
// 处理linked list编码的list对象
else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
list *list = o->ptr;