오늘은 Opensearch 데이터 인덱싱을 하는 람다 패키지를 만들어 배포했다.
기존에는 gcp kubernetes 환경에서 logstash를 이용하여 인덱싱을 했었다. 초기 개발비용 감소와 관리 포인트 감소의 장점이 있다 생각해서 lambda에서 인덱싱이 이루어지도록 헸다.
파이썬으로 postgresql에서 데이터 조회한 결과를 처리해서 Opensearch 도큐먼트로 만들어 인덱싱 하는 기능을 구현했다.
두가지 기능이 필요하다. 하나는 데이터가 db에 추가될 때 실시간으로 Opensearch에 인덱싱 해주는 기능. 또 하나는 전체 데이터를 조회하여 대량의 데이터를 인덱싱 하는 기능이다. 두번째 기능은 개발환경이나 테스트 환경을 구성할 때 필요하다.
pychopg2를 이용해 도큐먼트 생성에 필요한 데이터를 가져오는 DatabaseHandler 패키지
import psycopg2
from DatabaseHandler.query import DbQuery
class Retriever:
def __init__(self, host: str, dbname: str, user: str, password: str, port: int, size=1000):
self.connection = psycopg2.connect(
host=host,
dbname=dbname,
user=user,
password=password,
port=port
)
self.cursor = self.connection.cursor()
self.photoQuery = DbQuery.photoQuery
self.authorQuery = DbQuery.authorQuery
self.fetchedAll = True
self.fetchSize = size
def execute_photo_query(self):
self.fetchedAll = False
self.cursor.execute(self.photoQuery)
def fetch_photo(self):
result = self.cursor.fetchmany(self.fetchSize)
if len(result) == 0:
self.fetchedAll = True
return self.process_data(result)
def process_data(self, data):
results = []
column_names = [i[0] for i in self.cursor.description]
for datum in data:
result = {}
for column in zip(column_names, datum):
result[column[0]] = column[1]
results.append(result)
return results
opensearch-py를 이용해 조회해온 데이터로 인덱싱 요청을 하는 패키지
class Retriever:
def __init__(self, host: str, dbname: str, user: str, password: str, port: int, size=1000):
self.connection = psycopg2.connect(
host=host,
dbname=dbname,
user=user,
password=password,
port=port
)
self.cursor = self.connection.cursor()
self.photoQuery = DbQuery.photoQuery
self.authorQuery = DbQuery.authorQuery
self.fetchedAll = True
self.fetchSize = size
def execute_photo_query(self):
self.fetchedAll = False
self.cursor.execute(self.photoQuery)
def fetch_photo(self):
result = self.cursor.fetchmany(self.fetchSize)
if len(result) == 0:
self.fetchedAll = True
return self.process_data(result)
def process_data(self, data):
results = []
column_names = [i[0] for i in self.cursor.description]
for datum in data:
result = {}
for column in zip(column_names, datum):
result[column[0]] = column[1]
results.append(result)
return results
lambda_function
from DatabaseHandler.client import Retriever
from OpensearchHandler.client import Indexer
def lambda_handler(event, context):
indexer = Indexer(
host='',
region='',
http_auth=("", "!"),
index=""
)
retriever = Retriever(
host="",
dbname="",
user="",
password="!",
port=0
)
retriever.execute_photo_query()
while True:
photo_data = retriever.fetch_photo()
if len(photo_data) == 0:
break
result = indexer.request(photo_data)
print(result)
pychopy2 패키지는 아래에서 받았다.
GitHub - jkehler/awslambda-psycopg2
Contribute to jkehler/awslambda-psycopg2 development by creating an account on GitHub.
github.com
Readme에 따르면 아래와 같다.
aws ami에는 Postgresql 라이브러리가 없기 때문에 libpg를 다이나믹 링크로 불러오는 pychopg2는 작동을 안한다고 하며 따로 libpg를 컴파일하여 패키지에 포함시켜서 스태틱 링킹을 해야한다.
내가 할 일은 pychopy2-3.9 폴더를 lambda패키지 루트에 옮겨서 import 하는 것 뿐이다!
그 후 aws 가이드대로 패키지와 작성한 함수를 zip 파일로 묶어서 테라폼으로 배포했다.
python3.9 -m pip install --target ./package opensearch-py
cd package
zip -r ../indexing-lambda.zip .
cd ..
zip -g indexing-lambda.zip lambda_function.py
zip -g indexing-lambda.zip DatabaseHandler/*
zip -g indexing-lambda.zip OpensearchHandler/*
zip -g indexing-lambda.zip psycopg2/*
정상작동!
