diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index 69d4b29f29e..55fd0bd3b7c 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -80,8 +80,6 @@ current_file_handle, %% current write file offset current_file_offset, - %% messages that were potentially removed from the current write file - current_file_removes = [], %% TRef for our interval timer sync_timer_ref, %% files that had removes @@ -1166,11 +1164,7 @@ write_message(MsgId, Msg, CRef, end, CRef, State1) end. -remove_message(MsgId, CRef, - State = #msstate{ - index_ets = IndexEts, - current_file = CurrentFile, - current_file_removes = Removes }) -> +remove_message(MsgId, CRef, State = #msstate{ index_ets = IndexEts }) -> case should_mask_action(CRef, MsgId, State) of {true, _Location} -> State; @@ -1182,32 +1176,20 @@ remove_message(MsgId, CRef, %% ets:lookup(FileSummaryEts, File), State; {_Mask, #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize } = Entry} + total_size = TotalSize }} when RefCount > 0 -> %% only update field, otherwise bad interaction with %% concurrent GC + ok = index_update_ref_counter(IndexEts, MsgId, -1), case RefCount of %% Don't remove from cur_file_cache_ets here because %% there may be further writes in the mailbox for the - %% same msg. We will remove 0 ref_counts when rolling - %% over to the next write file. - 1 when File =:= CurrentFile -> - index_update_ref_counter(IndexEts, MsgId, -1), - State1 = State#msstate{current_file_removes = - [Entry#msg_location{ref_count=0}|Removes]}, - delete_file_if_empty( - File, gc_candidate(File, - adjust_valid_total_size( - File, -TotalSize, State1))); - 1 -> - index_delete(IndexEts, MsgId), - delete_file_if_empty( - File, gc_candidate(File, - adjust_valid_total_size( - File, -TotalSize, State))); - _ -> - index_update_ref_counter(IndexEts, MsgId, -1), - gc_candidate(File, State) + %% same msg. + 1 -> delete_file_if_empty( + File, gc_candidate(File, + adjust_valid_total_size( + File, -TotalSize, State))); + _ -> gc_candidate(File, State) end end. @@ -1269,9 +1251,7 @@ flush_or_roll_to_new_file( cur_file_cache_ets = CurFileCacheEts, file_size_limit = FileSizeLimit }) when Offset >= FileSizeLimit -> - %% Cleanup the index of messages that were removed before rolling over. - State0 = cleanup_index_on_roll_over(State), - State1 = internal_sync(State0), + State1 = internal_sync(State), ok = writer_close(CurHdl), NextFile = CurFile + 1, {ok, NextHdl} = writer_open(Dir, NextFile), @@ -1299,8 +1279,6 @@ write_large_message(MsgId, MsgBodyBin, index_ets = IndexEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts }) -> - %% Cleanup the index of messages that were removed before rolling over. - State1 = cleanup_index_on_roll_over(State0), {LargeMsgFile, LargeMsgHdl} = case CurOffset of %% We haven't written in the file yet. Use it. 0 -> @@ -1320,13 +1298,13 @@ write_large_message(MsgId, MsgBodyBin, ok = index_insert(IndexEts, #msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile, offset = 0, total_size = TotalSize }), - State2 = case CurFile of + State1 = case CurFile of %% We didn't open a new file. We must update the existing value. LargeMsgFile -> [_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile, [{#file_summary.valid_total_size, TotalSize}, {#file_summary.file_size, TotalSize}]), - State1; + State0; %% We opened a new file. We can insert it all at once. %% We must also check whether we need to delete the previous %% current file, because if there is no valid data this is @@ -1337,7 +1315,7 @@ write_large_message(MsgId, MsgBodyBin, valid_total_size = TotalSize, file_size = TotalSize, locked = false }), - delete_file_if_empty(CurFile, State1 #msstate { current_file_handle = LargeMsgHdl, + delete_file_if_empty(CurFile, State0 #msstate { current_file_handle = LargeMsgHdl, current_file = LargeMsgFile, current_file_offset = TotalSize }) end, @@ -1352,22 +1330,11 @@ write_large_message(MsgId, MsgBodyBin, %% Delete messages from the cache that were written to disk. true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}), %% Process confirms (this won't flush; we already did) and continue. - State = internal_sync(State2), + State = internal_sync(State1), State #msstate { current_file_handle = NextHdl, current_file = NextFile, current_file_offset = 0 }. -cleanup_index_on_roll_over(State = #msstate{ - index_ets = IndexEts, - current_file_removes = Removes}) -> - lists:foreach(fun(Entry) -> - %% We delete objects that have ref_count=0. If a message - %% got its ref_count increased, it will not be deleted. - %% We thus avoid extra index lookups to check for ref_count. - index_delete_object(IndexEts, Entry) - end, Removes), - State#msstate{current_file_removes=[]}. - contains_message(MsgId, From, State = #msstate{ index_ets = IndexEts }) -> MsgLocation = index_lookup_positive_ref_count(IndexEts, MsgId), gen_server2:reply(From, MsgLocation =/= not_found), @@ -2178,9 +2145,9 @@ truncate_file(File, Size, ThresholdTimestamp, #gc_state{ file_summary_ets = File -spec delete_file(non_neg_integer(), gc_state()) -> ok | defer. -delete_file(File, #gc_state { file_summary_ets = FileSummaryEts, - file_handles_ets = FileHandlesEts, - dir = Dir }) -> +delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, + file_handles_ets = FileHandlesEts, + dir = Dir }) -> case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of {[_|_], _Cont} -> ?LOG_DEBUG("Asked to delete file ~p but it has active readers. Deferring.", @@ -2189,6 +2156,7 @@ delete_file(File, #gc_state { file_summary_ets = FileSummaryEts, _ -> [#file_summary{ valid_total_size = 0, file_size = FileSize }] = ets:lookup(FileSummaryEts, File), + [] = scan_and_vacuum_message_file(File, State), ok = file:delete(form_filename(Dir, filenum_to_name(File))), true = ets:delete(FileSummaryEts, File), ?LOG_DEBUG("Deleted empty file number ~tp; reclaimed ~tp bytes", [File, FileSize]),