Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_tail: Fix a stall bug on !follow_inode case #4327

Merged
merged 3 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ def detach_watcher(tw, ino, close_io = true)

tw.close if close_io

if tw.unwatched && @pf
if @pf && tw.unwatched && (@follow_inode || !@tails[tw.path])
target_info = TargetInfo.new(tw.path, ino)
@pf.unwatch(target_info)
end
Expand Down
105 changes: 105 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3017,4 +3017,109 @@ def test_path_resurrection
)
end
end

sub_test_case "Update watchers for rotation without follow_inodes" do
# The scenario where in_tail wrongly unwatches the PositionEntry.
# This is reported in https://github.com/fluent/fluentd/issues/3614.
def test_refreshTW_during_rotation
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt0",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
# In order to detach the old watcher quickly.
"rotate_wait" => "3s",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
# stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file,
# so disable it in order to reproduce the same condition stably.
"enable_stat_watcher" => "false",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 6, timeout: 15) do
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1")

# This reproduces the following situation:
# `refresh_watchers` is called during the rotation process and it detects the current file being lost.
# Then it stops and unwatches the TailWatcher.
d.instance.refresh_watchers

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"}

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to add the new TailWatcher.
# After `rotate_wait` interval, the PositionEntry is unwatched.
# HOWEVER, the new TailWatcher is still using that PositionEntry, so this breaks the PositionFile!!
# That PositionEntry is removed from `PositionFile::map`, but it is still working and remaining in the real pos file.
sleep 5

# Append to the new current log file.
# The PositionEntry is updated although it does not exist in `PositionFile::map`.
# `PositionFile::map`: empty
# Real pos file: `.../tail.txt 0000000000000016 (inode)`
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"}

# Rotate again
[1, 0].each do |i|
FileUtils.move("#{@tmp_dir}/tail.txt#{i}", "#{@tmp_dir}/tail.txt#{i + 1}")
end
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file3 log1"}

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to update the TailWatcher.
sleep 3

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"}

# Wait `rotate_wait` for file2 to make sure to close all IO handlers
sleep 3
end

inode_0 = tail_watchers[0]&.ino
inode_1 = tail_watchers[1]&.ino
inode_2 = tail_watchers[2]&.ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2", "file3 log1", "file3 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0"],
tail_watcher_inodes: [inode_0, inode_1, inode_2],
tail_watcher_io_handler_opened_statuses: [false, false, false],
position_entries: [
# The recorded path is old, but it is no problem. The path is not used when using follow_inodes.
["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_2],
],
},
{
record_values: record_values,
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
position_entries: position_entries
},
)
end
end
end