Skip to content

Latest commit

 

History

History
406 lines (321 loc) · 17.5 KB

006.Log_Data_Practice_2.md

File metadata and controls

406 lines (321 loc) · 17.5 KB

006. Log Data Practice 2 - 지리적 데이터가 포함된 Log Data 가공 후 Elasticsearch 저장하기

해당 실습에서는 Elastic Stack 환경 구축에 대한 내용은 포함하고 있지 않습니다.

00. 실습환경

  • Windows 10
  • Elasticsearch 7.10.2
  • Logstash 7.10.2
  • Kibana 7.10.2
  • Filebeat 7.10.2
  • Git Bash

Elasticsearch와 Kibana가 실행된 상태로 아래 실습을 진행해주세요.

01. 데이터 준비하기

실습을 진행하기 위해 임의로 생성한 로그 파일을 활용하도록 하겠습니다. 로그 파일은 geo.log이며, 지난 실습(005. Log Data 가공 후 Elasticsearch 저장하기) 파일인 rest.log와 유사한 형태로 로그 파일을 구성하였습니다. 이 때, 좌표를 최대한 다양한 형태로 구성하였습니다.

02. filebeat 설정

먼저, geo.log를 읽어와 전달할 수 있도록 filebeat.yml을 설정합니다. 여기서 사용한 fields_under_root 필드를 true로 지정 시, fields에 작성한 커스텀 필드를 탑 레벨 필드로 설정합니다. default 설정은 false로 되어 있습니다. 그 외 설정에 대한 설명은 생략하며, 궁금하시다면 지난 실습 내용을 참고하시길 바랍니다.

- type: log
  enabled: true
  paths:
     - E:/personal/Workspace/TIL/practice/elastic-stack-posting/post006/geo.log
  <<: *multiline_defaults
  fields:
    application: practice02
    log_type: geo
  fields_under_root: true

설정을 완료하면 filebeat가 설치되어 있는 디렉터리에서 아래 명령어를 사용해 filebeat를 실행합니다.

./filebeat.exe -e

03. logstash 설정

이번에는 logstash를 설정하도록 하겠습니다. 먼저 pipelines.yml을 설정합니다.

- pipeline.id: practicepipe
  path.config: "../config/practice.conf"

그 다음 log 파일을 정상적으로 불러오는지 확인하기 위해, 지난 실습을 참고하여 practice.conf파일에 기본 filter를 작성하였습니다.

input {
	beats {
		port => 5044
	}
}

filter {
	if [application] == "practice02" {
		if [log_type] == "geo" {
			dissect {
				mapping => {
					"message" => "%{log_datetime} %{+log_datetime} %{} %{level} [%{code_path}] %{message}"
				}
			}
			
			mutate {
				strip => ["message"]
			}

			if [message] =~ /^status=\[REQUEST\]/ {
				grok {
					match => { "message" => "^(?m)%{GREEDYDATA:request_info} data=\[%{GREEDYDATA:data}\]" }
				}
				kv {
					source => "request_info"
					field_split => " "
					value_split => "="
					target => "request_info"
				}
				json {
					source => "data"
					target => "[request_info][data]"
				}
				mutate {
					remove_field => ["data"]
				}
			}
		}
		
		date {
			match => ["log_datetime", "YYYY-MM-dd HH:mm:ss.SSS", "dd/MMM/YYYY:HH:mm:ss"]
			target => "datetime"
			timezone => "Asia/Seoul"
		}
	}
}

output {
	if [application] == "practice02" {
		elasticsearch {
			hosts => ["localhost:9200"]
			index => "practice_02"
		}
	}
}

설정을 완료하면, logstash를 실행합니다.

./logstash.bat

그 다음 아래 명령어를 Kibana > DevTools에서 실행하여 정상 생성되었는지 확인해봅시다.

GET practice_02/_search

현재 geo.log는 총 15줄의 로그가 존재하는데, 실행 결과 14개의 document만 생성된 것을 확인할 수 있습니다.

image

생성이 되지 않은 도큐먼트를 확인해보니, 중복으로 생성되는 필드(location)의 타입이 달라 발생한 것이었습니다.

2023-07-18 00:01:12.362 1 DEBUG [DefaultGlobalFilter.java:lambda$filter$3():68] status=[REQUEST] key=[userkey] request_id=[requestid] host=[api.test.com] client_ip=[123.456.789.12] method=[GET] user_agent=[Mozilla/5.0 (Linux; Android 13; SM-G991N Build/TP1A.220624.014;) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/111.0.5563.58 Mobile Safari/537.36 android/GA_Android/_ga_cid=null;_ga_adid=null;; android/GA_Android/_ga_adid=00000000-0000-0000-0000-000000000000;_ga_cid=gacid;;] accept=[*/*] content_type=[*/*] service=[TEST01] url=[/test01] http_status=[100] data=[{"key":"userkey","parameter":"value","location":"37.5557651,126.9458323"}]
2023-07-18 00:04:51.949 1 DEBUG [DefaultGlobalFilter.java:lambda$filter$2():80] status=[REQUEST] key=[userkey] request_id=[requestid] host=[api.test.com] client_ip=[123.456.789.12] method=[POST] user_agent=[Mozilla] accept=[null] content_type=[application/json] service=[TEST04] url=[/test04] http_status=[100] data=[{"key":"userkey","location":{"lng":126.993056, "lat":37.533531}}]

첫 번째 로그의 location은 text 타입인데, 두 번째 로그의 location은 object 타입입니다. 두 필드는 모두 지리적 데이터를 저장하고 있으며 동일 타입으로 저장될 수 있도록 logstash에서 처리하도록 하겠습니다.

04. geo-point 타입 적용할 수 있게 데이터 변환하기

geo.log를 살펴보면 지리에 대한 정보가 다양한 필드 이름으로 들어오며, 동일 이름이더라도 다른 타입임을 알 수 있습니다. 우리가 보기에는 위도와 경도의 정보를 담은 것으로 보이나, 실제로 elasticsearch에 지리적 데이터라는 명시적인 표시를 주기 위해서는 geo-point 타입이나 geo-shape 타입으로 저장해야 합니다. 그러지 않으면 kibana 대시보드 내에서 지도 위에 데이터를 표출하기 어렵습니다. 또한, geo-point 타입이나 geo-shape 타입으로 매핑하려면 정해진 규칙에 맞춰 데이터를 저장해야 합니다.

elasticsearch에 geo-point 타입으로 매핑하기 위해서는 데이터를 다음과 같이 구성해야 합니다.

  • 문자열의 경우, 위도와 경도 순서로 되어 있어야 함.
    e.g. "37.5557651,126.9458323"
  • 배열의 경우, 경도와 위도 순서로 되어 있어야 함.
    e.g. [127.290466,36.893677]
  • object 타입으로 추가할 경우, 위도는 lat으로, 경도는 lon으로 필드명을 생성해야 함.
    e.g. "location": {"lat": 37.5557651,"lon": 126.9458323}

elasticsearch에 geo-shape 타입으로 매핑하기 위해서는 데이터를 다음과 같이 구성해야 합니다.

  • coordinates 필드와 type 필드만 존재
  • coordinates 필드는 경도와 위도 순서로 되어 있어야 함. e.g. [127.290466,36.893677]
  • type 필드는 소문자로만 구성되어 있어야 함. ⇒ 참고

geo.log의 데이터를 타입별로 살펴보면 다음과 같은 형태로 되어 있습니다.

elasticsearch type ruby type description field
geo-point String 위도와 경도를 순서대로 적은 문자열 location, latlng, origin, destination, coords
위도와 경도를 순서대로 적은 문자열 모음,;로 구분 waypoints
위도와 경도를 순서대로 적은 문자열 모음,` `로 구분
Hash 위도(lat)와 경도(lng)로 구성된 object location, origin, destination, start, goal
위도(YPos)와 경도(XPos)로 구성된 object StartPoint, GoalPoint
위도(y)와 경도(x)로 구성된 object StartPoint, GoalPoint, SPoint, EPoint
Array 위도(lat)와 경도(lng)로 구성된 object의 배열 origins, destinations, waypoints
위도(YPos)와 경도(XPos)로 구성된 object의 배열 ViaPoint, ViaPoints
위도(y)와 경도(x)로 구성된 object의 배열 SPointList
geo-shape - type과 coordinates로 구성된 object coordinates

데이터의 형태가 달라 변환이 복잡하므로, ruby 플러그인을 사용해 ruby 코드를 구현하여 변환하였습니다.

# ...

filter {  
  if [application] == "practice02" {
    if [log_type] == "geo" {
      # ...

      if [message] =~ /^status=\[REQUEST\]/ {
        # ...

        mutate {
          remove_field => ["data"]
        }
				
				ruby {
          code => '
            def convert_to_location(event, field)
              value = event.get("[request_info][data][#{field}]")
              if value.is_a?(String)
                coordinates = value.split(/[|;]/).map { |coord| coord.split(",") }
                points = coordinates.map do |coords|
                  lng = coords[1].to_f
                  lat = coords[0].to_f
                  { "lon" => lng, "lat" => lat }
                end
                event.set("[request_info][data][#{field}]", points)
              elsif value.is_a?(Array)
                points = value.map do |point|
                  lng = point["lng"] || point["lon"] || point["x"] || point["XPos"]
                  lat = point["lat"] || point["y"] || point["YPos"]
                  if lng.is_a?(Numeric) && lat.is_a?(Numeric)
                    { "lon" => lng, "lat" => lat }
                  else
                    point
                  end
                end
                event.set("[request_info][data][#{field}]", points)
              elsif value.is_a?(Hash)
                lng = value["lng"] || value["lon"] || value["x"] || value["XPos"]
                lat = value["lat"] || value["y"] || value["YPos"]
                if lng.is_a?(Numeric) && lat.is_a?(Numeric)
                  event.set("[request_info][data][#{field}]", { "lon" => lng, "lat" => lat })
                end
              end
            end

            # Convert latitude and longitude fields to geo-point
            ["location", "latlng", "origin", "origins", "destination", "destinations", "waypoints", "StartPoint", "GoalPoint", "ViaPoint", "ViaPoints", "SPoint", "EPoint", "SPointList", "points", "start", "goal", "coords"].each do |field|
              convert_to_location(event, field)
            end
          '
        }
      }
    }
    
    # ...
  }
}
# ...

지정한 필드에 대해서만 convert_to_location() 메서드를 실행합니다. 타입에 따라 다 다르게 동작하며, tring 타입은 구분자(|, ;)를 기준으로 배열을 만들어 동작하도록 구성하였습니다.

코드 작성을 완료하면, 설정을 적용하기 위해 logstash와 filebeat를 재실행합니다. 이 때, logstash에서 생성한 인덱스와 filebeat data 디렉터리를 제거하고 진행해주세요!

그 다음 Kibana > DevTools에서 아래 명령어를 실행하여 정상 생성되었는지 확인해봅시다.

GET practice_02/_search
{
  "_source": "request_info", 
  "size": 15
}

명령어를 실행해보면 15개가 생성되었으며, 데이터를 살펴보면 좌표는 lonlat을 사용하여 표출하도록 변환된 것을 확인할 수 있습니다.

image

05. Elasticsearch Index Template 생성하기

마지막으로 데이터가 geo-point 타입이나 geo-shape 타입으로 저장될 수 있도록 elasticsearch에 인덱스 템플릿을 생성해 봅시다. mappings 설정을 통해 필드별로 타입을 지정할 수 있으며, settings는 인덱스에 대한 설정을 지정할 수 있습니다.

PUT _index_template/practice_template
{
    "index_patterns": ["practice_*"],
    "priority": 1,
    "template": {
        "settings": {
            "number_of_shards": 3,
            "number_of_replicas": 1
        },
        "mappings": {
          "properties": {
            "request_info": {
              "properties": {
                "data": {
                  "properties": {
                    "location": {"type": "geo_point"},
                    "latlng": {"type": "geo_point"},
                    "origin": {"type": "geo_point"},
                    "origins": {"type": "geo_point"},
                    "destination": {"type": "geo_point"},
                    "destinations": {"type": "geo_point"},
                    "waypoints": {"type": "geo_point"},
                    "StartPoint": {"type": "geo_point"},
                    "GoalPoint": {"type": "geo_point"},
                    "ViaPoint": {"type": "geo_point"},
                    "ViaPoints": {"type": "geo_point"},
                    "SPoint": {"type": "geo_point"},
                    "EPoint": {"type": "geo_point"},
                    "SPointList": {"type": "geo_point"},
                    "points": {"type": "geo_point"},
                    "start": {"type": "geo_point"},
                    "goal": {"type": "geo_point"},
                    "coords": {"type": "geo_point"},
                    "geometry": {"type": "geo_shape"}
                  }
                }
              }
            }
          }
        }
    }
}

인덱스 템플릿을 생성한 다음 새로 만들어지는 인덱스들만 영향을 받고, 이미 존재하던 인덱스에는 적용되지 않습니다. 그러므로 logstash에서 생성한 인덱스를 제거한 다음, logstash를 다시 실행합니다. 이 때, filebeat의 경우, data 디렉터리를 제거하고 재실행해주세요.

정상적으로 매핑되었는지 mapping API를 통해 확인해봅시다.

GET practice_02/_mapping

위 명령어를 실행하면 다음과 같이 practice_02 인덱스에 대한 매핑된 정보가 표출되며, 정상적으로 매핑되었음을 확인할 수 있습니다.

{
  "practice_02" : {
    "mappings" : {
      "properties" : {
        // ...
        "request_info" : {
          "properties" : {
            // ...
            "data" : {
              "properties" : {
                "EPoint" : {"type" : "geo_point"},
                "GoalPoint" : {"type" : "geo_point"},
                "SPoint" : {"type" : "geo_point"},
                "SPointList" : {"type" : "geo_point"},
                "StartPoint" : {"type" : "geo_point"},
                "ViaPoint" : {"type" : "geo_point"},
                "ViaPoints" : {"type" : "geo_point"},
                "coords" : {"type" : "geo_point"},
                "destination" : {"type" : "geo_point"},
                "destinations" : {"type" : "geo_point"},
                "geometry" : {"type" : "geo_shape"},
                "goal" : {"type" : "geo_point"},
                "latlng" : {"type" : "geo_point"},
                "location" : {"type" : "geo_point"},
                "origin" : {"type" : "geo_point"},
                "origins" : {"type" : "geo_point"},
                "points" : {"type" : "geo_point"},
                "start" : {"type" : "geo_point"},
                "waypoints" : {"type" : "geo_point"}
              }
            },
            // ...
          }
        },
        // ...
      }
    }
  }
}

06. Kibana Maps로 좌표 표출하기

Kibana Maps로 표출하기 위해서는 먼저 Kibana 인덱스 패턴을 생성해야 합니다. 인덱스 패턴 생성을 위해 Stack Management > Kibana > Index patterns으로 이동합니다.

image

그 다음 Create index pattern 버튼을 누르면, 다음과 같은 화면이 표시됩니다.

image

Index pattern name으로 practice_*을 입력한 다음 Next step 버튼을 클릭합니다. 그 다음 표시되는 화면에서 아래와 같이 Time fielddatetime으로 설정하고 Create index pattern을 클릭하면 인덱스 패턴 생성이 완료됩니다.

image

좌표를 지도 위에 시각화하기 위해 Kibana > Maps로 이동합니다.

image

먼저 날짜를 로그 시간대에 맞게 변경해줍니다.

image

그 다음 Add layer를 클릭한 다음, Elasticsearch > Documents를 선택합니다.

image

그러면 다음과 같이 Index pattern 선택 화면이 표시됩니다. 여기서 이전에 생성한 practice_*을 선택합니다.

image

그 다음 Geospatial field를 원하는 필드로 선택하고 Add layer를 클릭합니다.

image

그러면 다음과 같이 상세 설정 화면이 표시되는데, 여기서는 Name만 설정하도록 하겠습니다. 필요한 설정 입력 후 Save & close를 누르면 지도 위에 좌표가 표출됩니다.

image


참고