Skip to content

Commit

Permalink
[fix](logstash) remove ShortNameResolver to solve thread race problem (
Browse files Browse the repository at this point in the history
…apache#44598)

remove ShortNameResolver to solve thread race problem
  • Loading branch information
joker-star-l authored Dec 2, 2024
1 parent 70b0a08 commit 21e1d6d
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 107 deletions.
50 changes: 2 additions & 48 deletions extension/logstash/lib/logstash/outputs/doris.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
require "logstash/outputs/base"
require "logstash/namespace"
require "logstash/json"
require "logstash/util/shortname_resolver"
require 'logstash/util/formater'
require "uri"
require "securerandom"
Expand Down Expand Up @@ -67,8 +66,6 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base

config :save_file, :validate => :string, :default => "failed.data"

config :host_resolve_ttl_sec, :validate => :number, :default => 120

config :max_retries, :validate => :number, :default => -1

config :log_request, :validate => :boolean, :default => true
Expand All @@ -92,10 +89,6 @@ def print_plugin_info()
def register
@http_query = "/api/#{@db}/#{@table}/_stream_load"

@hostnames_pool =
parse_http_hosts(@http_hosts,
ShortNameResolver.new(ttl: @host_resolve_ttl_sec, logger: @logger))

@request_headers = make_request_headers
@logger.info("request headers: ", @request_headers)

Expand Down Expand Up @@ -141,39 +134,6 @@ def register
print_plugin_info()
end # def register

private

def parse_http_hosts(hosts, resolver)
ip_re = /^[\d]+\.[\d]+\.[\d]+\.[\d]+$/

lambda {
hosts.flat_map { |h|
scheme = URI(h).scheme
host = URI(h).host
port = URI(h).port
path = URI(h).path

if ip_re !~ host
resolver.get_addresses(host).map { |ip|
"#{scheme}://#{ip}:#{port}#{path}"
}
else
[h]
end
}
}
end

private

def get_host_addresses()
begin
@hostnames_pool.call
rescue Exception => ex
@logger.error('Error while resolving host', :error => ex.to_s)
end
end

def multi_receive(events)
return if events.empty?
send_events(events)
Expand All @@ -191,8 +151,6 @@ def send_events(events)
# @logger.info("get event num: #{event_num}")
@logger.debug("get documents: #{documents}")

hosts = get_host_addresses()

http_headers = @request_headers.dup
if !@group_commit
# only set label if group_commit is off_mode or not set, since lable can not be used with group_commit
Expand All @@ -202,7 +160,7 @@ def send_events(events)
req_count = 0
sleep_for = 1
while true
response = make_request(documents, http_headers, hosts, @http_query, hosts.sample)
response = make_request(documents, http_headers, @http_query, @http_hosts.sample)

req_count += 1
response_json = {}
Expand Down Expand Up @@ -246,11 +204,7 @@ def send_events(events)
end

private
def make_request(documents, http_headers, hosts, query, host = "")
if host == ""
host = hosts.pop
end

def make_request(documents, http_headers, query, host)
url = host + query

if @log_request or @logger.debug?
Expand Down
58 changes: 0 additions & 58 deletions extension/logstash/lib/logstash/util/shortname_resolver.rb

This file was deleted.

1 change: 0 additions & 1 deletion extension/logstash/logstash-output-doris.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'mini_cache', ">= 1.0.0", "< 2.0.0"
s.add_runtime_dependency "rest-client", '~> 2.1'

s.add_development_dependency 'logstash-devutils', '~> 1.3'
Expand Down

0 comments on commit 21e1d6d

Please sign in to comment.