본문 바로가기

Open source

Docker compose를 활용한 Elasticsearch, Kibana, Logstash 구축 및 활용 part2 (for Mac)

들어가기 앞서...

- 아래 링크에서 진행했던, docker compose를 활용한 ELK stack 구축에 이어서 뉴스를 스크래핑해서 색인/검색에 필요한 간단한 기능들을 활용하는 내용이다.

- python을 활용해서 Elasticsearch에 색인/검색하는 법과, logstash에 로그남기고 Elasticsearch에 색인하여 Kibana를 통해 확인해 볼 예정이다.

 

2021/02/23 - [Open source] - Docker compose를 활용한 Elasticsearch, Kibana, Logstash 구축 및 활용 part1 (for Mac)

 

Docker compose를 활용한 Elasticsearch, Kibana, Logstash 구축 및 활용 part1 (for Mac)

들어가기 앞서.. - "내 맘대로 네이버 뉴스 스크래핑"[링크]의 연장선으로 스크래핑한 스케줄링하여 색인/검색할 시스템이 필요해서 Elasticsearch를 이용해 보기로 했다. 검색을 하던 중 보통 ELK(Elas

dong-guri.tistory.com

 

 

Elasticsearch

python에서 elasticsearch 라이브러리를 이용해서 elasticsearch에 데이터를 색인하고 데이터를 검색하는 함수를 활용했다.  색인을하고 구축한 kibana를 통해 데이터를 확인 할 수 있다.

 

from elasticsearch import Elasticsearch, helpers
import datetime


def insert_data(index_name, doc_list):
    es = Elasticsearch('http://0.0.0.0:9200', http_auth=("elastic","123456")) # python에서 localhost안먹히므로, 0.0.0.0에 host ip를 특정해야함
    index=index_name
    

    helpers.bulk(es, doc_list)

if __name__ == '__main__':
    drop_index("test")
    doc1 = {
        "_index" : "test",
        "_source" :{
            "product_name" : "IPHONE 12",
            "c_key" : "1",
            "price" : 13040000,
            "status" : 1,
            "@timestamp" : datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
        }
    }
    doc2 = {
        "_index" : "test",
        "_source" :{
            "product_name" : "APPLE WATCH",
            "c_key" : "2",
            "price" : 3040000,
            "status" : 1,
            "@timestamp" : datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
        }
    }
    doc_list = []
    doc_list.append(doc1)
    doc_list.append(doc2)

    insert_data(index_name="test", doc_list = doc_list)

insert_data 함수를 활용해서 documents를 elasticsearch에 색인 할 수 있다. Elasticsearch객체 생성시에 http_auth에 part1에서의 elasticsearch 정보를 입력하면 된다. 색인 데이터는 doc1과 doc2를 보면 색인할 index 이름과 데이터를 "_index", "_source"로 정하고, "_source"내의 데이터를 json형식으로 일괄적으로 만들면, Elasticsearch에서 json구조를 인식해서 색인한다.

 

위의 코드를 실행하면 아래와 같이 kibana의 Discover탭에서 색인된 데이터를 확인할 수 있다.

test index를 생성해 아이폰12와 애플워치 데이터를 색인한 kibana화면

 

 

색인을 지우거나, 색인내의 문서를 지우거나, 특정 색인내의 문서에서 query를 통해 검색하는 코드는 아래와 같다.

def drop_index(index_name):
    es = Elasticsearch('http://0.0.0.0:9200', http_auth=("elastic","123456"))
    es.indices.delete(index = index_name, ignore=[400,404])


def delete_doc(index_name, query):
    es = Elasticsearch('http://0.0.0.0:9200', http_auth=("elastic","123456"))
    es.delete_by_query(index = index_name, doc_type="_doc", body = query)

def search(index_name, query):
    es = Elasticsearch('http://0.0.0.0:9200', http_auth=("elastic","123456"))
    index = index_name
    body = query
    res = es.search(index=index, body=body)
    return res

 

 

Logstash

python에서 logging을 위해 구축한 logstash에 로그를 보내고 이를 elasticsearch에 색인하고 kibana에서 확인해볼 것이다.

먼저 logging.conf 파일을 아래와 같이 만들어 준다.

##############################################################
## 키 정의
##############################################################
[loggers]
keys = root,console_only,logstash_only

[handlers]
keys = console,logstash

[formatters]
keys = console,logstash

##############################################################
## 로거 정의
##############################################################
[logger_root]
qualname = root
handlers = console,logstash
level = INFO
propagate = 1

[logger_console_only]
qualname = console_only
handlers = console
level = NOTSET
propagate = 0

[logger_logstash_only]
qualname = logstash_only
handlers = logstash
level = NOTSET
propagate = 0

##############################################################
## 핸들러 정의
##############################################################
[handler_console]
class = StreamHandler
level = NOTSET
formatter = console
args = (sys.stdout,)

[handler_logstash]
class = logstash_async.handler.AsynchronousLogstashHandler
level = NOTSET
formatter = logstash
args = ('%(host)s', %(port)s, '%(transport)s')
host = 0.0.0.0
port = 5000
transport = logstash_async.transport.TcpTransport

##############################################################
## 포멧터 정의
##############################################################
[formatter_console]
format = %(asctime)s.%(msecs)03dZ|%(levelname)-8s|%(funcName)s()|%(message)s
datefmt = %Y-%m-%dT%H:%M:%S

[formatter_logstash]
class = logstash_async.formatter.LogstashFormatter
format = format
datefmt = custom-tag
style = True

logging.conf파일 내의 host와 port를 logstash서버와 일치 시켜준다.

 

import logging
import argparse
from logging.config import fileConfig

parser = argparse.ArgumentParser(description='Logging')
parser.add_argument('--logger_name', type=str, default='root')

def main(logger_name):

    if logger_name not in ["root", "logstash_only", "console_only"]:
        raise Exception("logger_name(" + logger_name + ") does not exist")

    fileConfig('logging.conf', disable_existing_loggers=True)
    logger = logging.getLogger(logger_name)
    logger.info("Start")
    try:
        return 1 / 0 ## generate exception
    except Exception as e:
        logger.error(e)
    logger.info("End")

if __name__ == '__main__':
    args = parser.parse_args()
    main(args.logger_name)

logger_name옵션에서 "root"(console과 logstash에 모두 로깅), "logstash_only"(logstash에만 로깅), "console_only"(console에만 로깅) 3가지를 옵션으로 정할 수 있다. 로깅을 위해 0으로 나누는 excepntion을 발생시키는 예제이다.

 

logging_test파일 실행시 console에 로깅 출력

 

이제 kibana를 통해 로그가 제대로 elasticsearch에 색인됐는지 확인해보자. logstash에 대한 색인을 따로 생성하지 않았으면 위의 elasticsearch 예제에서 test색인 밖에 보이지 않을 것이다. 아래와 같이 kibana의 stack management에서 index pattern을 생성해야 색인된 로그를 확인할 수 있다. (logstash pipline내에 conf파일에서 output elasticsearch 옵션 확인)

logstash* index patten을 생성

 

위와 같이 index pattern을 생성하면 logging_test.py에 대한 로그는 아래와 같이 kibana를 통해 로그를 확인할 수 있고, 검색도 가능하다.

python의 logging_test.py에 대한 로그를 kibana 통해 확인 가능

 

 

마무리..

Python을 활용해서 part1에서 구축한 elk stack에 대한 기본적인 활용 예제를 다뤘다. 이제 스케줄링을 통해 매일매일 뉴스 스크래핑을 통해 데이터를 elasticsearch에 색인하고, logstash, kibana를 활용해서 모니터링해볼 예정이다.

관련 프로젝트에 대한 소스는 [링크]를 통해 확인 가능하다.