Skip to content

Commit

Permalink
keep Time class untouched
Browse files Browse the repository at this point in the history
  • Loading branch information
archfish committed Nov 25, 2019
1 parent 3120cf0 commit 60b1bb3
Show file tree
Hide file tree
Showing 10 changed files with 19 additions and 15 deletions.
6 changes: 3 additions & 3 deletions examples/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
)
)

p_msg = PulsarSdk::Producer::Message.new("dang qian shi jian #{Time.now}")
p_msg = PulsarSdk::Producer::Message.new("dang qian shi jian #{TimeX.now}")

# ++======消息发送后等待系统响应======++
producer.execute(base_cmd, p_msg)

# ++======调用后立即返回,服务器可能还没收到消息======++
producer.execute_async(base_cmd, p_msg)

# ++======发送消息后需要获取消息回执,因为回执与producer_id和request_id有关必须要知道真实的producer才能获取到回执======++
# ++======发送消息后需要获取消息回执,因为回执与producer_id和request_id有关必须要知道真实的producer才能获取到======++
real_producer = producer.real_producer(p_msg)
real_producer.execute(base_cmd, p_msg)
real_producer.receipt
Expand All @@ -39,7 +39,7 @@
)
)
# message will available in 10 second
deliver_at = Time.now + 10
deliver_at = TimeX.now + 10
p_msg = PulsarSdk::Producer::Message.new(
"dang qian shi jian publush at: #{now}, performat at: #{deliver_at}",
Pulsar::Proto::MessageMetadata.new(deliver_at_time: deliver_at.timestamp)
Expand Down
4 changes: 2 additions & 2 deletions lib/pulsar_sdk/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def initialize(proxy_addr, broker_addr = nil, tls_options = nil, auth_provider =
@tls_options = tls_options&.dup
@auth_provider = auth_provider
@socket = nil
@last_data_received_at = Time.now
@last_data_received_at = TimeX.now
@state = Status.new
self.operation_timeout = 30
self.connection_timeout = 5
Expand Down Expand Up @@ -247,7 +247,7 @@ def handle_send_receipt(send_receipt)
end

def set_last_data_received
@last_data_received_at = Time.now
@last_data_received_at = TimeX.now
end

def handle_ping
Expand Down
2 changes: 1 addition & 1 deletion lib/pulsar_sdk/producer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def initialize(msg, metadata = nil)

@metadata ||= Pulsar::Proto::MessageMetadata.new
publish_time = @metadata.publish_time
@metadata.publish_time = publish_time.zero? ? Time.now.timestamp : publish_time
@metadata.publish_time = publish_time.zero? ? TimeX.now.timestamp : publish_time
end

def producer_name=(v)
Expand Down
2 changes: 1 addition & 1 deletion lib/pulsar_sdk/producer/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def route(key, total, delay = 0)

return (@handler.call(key) % total) unless key.to_s.empty?

Murmur3.int32_hash(Time.now.timestamp) % total
Murmur3.int32_hash(TimeX.now.timestamp) % total
end

# 将hash值限制在32位内,防止key过长导致过多内存占用
Expand Down
2 changes: 1 addition & 1 deletion lib/pulsar_sdk/protocol/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Message
define_method "#{x}_at" do
v = self.public_send("#{x}_time").to_i
return if v.zero?
Time.at_timestamp(v)
TimeX.at_timestamp(v)
end
end

Expand Down
1 change: 0 additions & 1 deletion lib/pulsar_sdk/tweaks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,3 @@

# 扩展type的判断方法,方便书写,统一以 typeof_ 开头
Pulsar::Proto::BaseCommand.prepend PulsarSdk::Tweaks::BaseCommand
Time.prepend PulsarSdk::Tweaks::TimeAtMicrosecond
7 changes: 6 additions & 1 deletion lib/pulsar_sdk/tweaks/time_at_microsecond.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ def timestamp
module ClassMethods
def at_timestamp(v)
second, micro = v.divmod(1000)
Time.at(second, micro * 1000)
self.at(second, micro * 1000)
end
end
end
end
end

# 扩展默认时间方法,增加毫秒时间戳相关处理
class TimeX < Time
prepend PulsarSdk::Tweaks::TimeAtMicrosecond
end
4 changes: 2 additions & 2 deletions lib/pulsar_sdk/tweaks/timeout_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def pop(timeout = nil)
@received.wait(@mutex)
end
elsif @receive_queue.empty? && timeout != 0
timeout_at = Time.now.to_f + timeout
while @receive_queue.empty? && (res = timeout_at - Time.now.to_f) > 0
timeout_at = TimeX.now.to_f + timeout
while @receive_queue.empty? && (res = timeout_at - TimeX.now.to_f) > 0
@received.wait(@mutex, res)
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/pulsar_sdk/tweaks/wait_map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ def delete(id, timeout = nil)
signal.wait(mutex)
end
elsif @responses.empty? && timeout != 0
timeout_at = Time.now.to_f + timeout
while @responses.empty? && (res = timeout_at - Time.now.to_f) > 0
timeout_at = TimeX.now.to_f + timeout
while @responses.empty? && (res = timeout_at - TimeX.now.to_f) > 0
signal.wait(mutex, res)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/pulsar_sdk/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module PulsarSdk
VERSION = "0.3.2"
VERSION = "0.4.1"
end

0 comments on commit 60b1bb3

Please sign in to comment.