티스토리 뷰

Glue Crawler를 통한 Catalog 생성


Glue Crawler를 통해 Glue Catalog Table을 생성하면 Athena를 통해 쿼리를 날릴 수 있다.

예를 들어 Pinpoint Stream을 ON 한 후 S3에 쌓은 데이터에 대하여 크롤러를 돌리면 아래와 같은 스키마가 찾아진다.

13번 부터 16번에 있는 partition_0 ~ partition_3 의 경우 year, month, day, hour에 해당하는 partition이다. Athena에서 위의 Schema를 이용하여 쿼리를 사용하려 하면 제대로 되지 않는다.

Athena에서 Glue Catalog를 사용하기

Athena에서 위의 Catalog를 사용하여 쿼리를 날리고자 하는 경우, Edit Schema를 눌러서 Partition에 해당하는 부분을 제거 후 진행해야 한다. 뭔가 지우지 않고 어떻게 잘 사용하는 방법이 있을 것 같은데 어떻게 하는지는 아직 잘 모르겠다. 알게되면 추후 업데이트 하겠음.

Glue Job에서 Glue Catalog 사용하기

Glue Job을 돌리기 위한 Catalog에서는 생성된 Partition Column을 지우면 안된다.

지울 경우 정상적인 동작을 보장할 수 없다. 쿼리 결과가 이상하게 나온다.

Glue Job Code


아래의 Glue Job을 통해 Athena에서 수행한 쿼리 그대로 Glue에서 ETL 작업을 수행 할 수 있다.

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import regexp_extract, col

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# SparkContext 생성
sc = SparkContext()
# GlueContext 생성
glueContext = GlueContext(sc)
# SparkSession 생성
spark = glueContext.spark_session
# Job 생성
job = Job(glueContext)
# Job 초기화
job.init(args['JOB_NAME'], args)

# Catalog로 부터 DynamicFrame 생성 후 이를 DataFrame으로 변경 한다.
digital_df = glueContext.create_dynamic_frame.from_catalog(database='digital_campaign', table_name='pinpoint_push', transformation_ctx='digital_df').toDF()

# DataFrame으로 부터 pinpoint라는 이름으로 GlobalTempView를 생성한다.
digital_df.createGlobalTempView("pinpoint")

# 이제부터 global_temp.pinpoint view를 이용하여 Spark SQL을 사용 할 수 있다.
# spark.sql("select * from  global_temp.pinpoint").show()

# 원하는 쿼리를 날려보자
# 아래 쿼리는 리턴된 캠페인 중 노티피케이션을 1넘게 읽은 cliet.client_id를 select 한 후 DataFrame을 생성한다.
client_df = spark.sql("select client.client_id as client_id, count(*) as cnt from global_temp.pinpoint where attributes.campaign_id in ('19fa2cd80e0c4588b6b9f281820f794f', '80610fe9fcbb4db4a45b377da2651416', 'd7330d0c1a7a4153bf9e3a6ded801ed5') and event_type = '_campaign.opened_notification' group by client.client_id having count(*) > 1")

# Output 폴더에 결과를 Overwrite한다. Output 폴더에 있던 모든 내용이 없어지니 주의 할 것.
client_df.write.format('csv').mode('overwrite').save('s3://aws-glue-temporary-332346530000-us-west-2/output')

# Job commit
job.commit()

핵심이 되는 부분은 createGlobalTempView("pinpoint") 이다.

해당 API를 통해 pinpoint라는 view를 만들었고 밑에 있는 spark.sql 구문에서 global_temp.pinpoint 라는 이름으로 pinpoint view에 직접 Spark SQL을 사용 할 수 있었다.

이때 createGlobalTempView가 아닌 createOrReplaceTempView를 생각 해 볼 수 있는데 이 경우는 view가 세션 내에서만 유지 되기 때문에 Glue Job에서는 적절한 선택은 아닌 것 같다.

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2024/03   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31
글 보관함