티스토리 뷰

Firehose로 들어오는 Record Data를 변환 하여 Destination으로 보내는 것이 가능하다. (보통은 S3로 보냄)

샘플 소스는 아래 주소

https://github.com/jakemraz/aws-kinesisfirehose-convert-lambda

기본으로 주는 템플릿을 가지고 아무리 시도해도 아래와 같은 에러가 나더라

Invalid output structure: Please check your function and make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed.

{
    "attemptsMade": 4,
    "arrivalTimestamp": 1575281287915,
    "errorCode": "Lambda.FunctionError",
    "errorMessage": "Invalid output structure: Please check your function and make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed.",
    "attemptEndingTimestamp": 1575281365026,
    "rawData": "ewogICAgImV2ZW50X3R5cGUiOiAiX2NhbXBhaWduLnNlbmQiLAogICAgImV2ZW50X3RpbWVzdGFtcCI6IDE1NzUyODEyODU5NDksCiAgICAiYXJyaXZhbF90aW1lc3RhbXAiOiAxNTc1MjgxMjg2MDczLAogICAgImV2ZW50X3ZlcnNpb24iOiAiMy4xIiwKICAgICJhcHBsaWNhdGlvbiI6IHsKICAgICAgImFwcF9pZCI6ICI2ZGQ1Mjc1MGUxYjU0Mzk3YWQyNThjODgyN2VlMDAwMCIsCiAgICAgICJzZGsiOiB7CiAgICAgICAgCiAgICAgIH0KICAgIH0sCiAgICAiY2xpZW50IjogewogICAgICAiY2xpZW50X2lkIjogIjYyYTA4ZjNlLTNmNzgtNGJjYi04OGY5LTE0YWE0ZDc0MDAwMCIKICAgIH0sCiAgICAiZGV2aWNlIjogewogICAgICAicGxhdGZvcm0iOiB7CiAgICAgICAgCiAgICAgIH0KICAgIH0sCiAgICAic2Vzc2lvbiI6IHsKICAgICAgCiAgICB9LAogICAgImF0dHJpYnV0ZXMiOiB7CiAgICAgICJjYW1wYWlnbl9hY3Rpdml0eV9pZCI6ICI1NDM3MjcyY2Q0MmE0OTczYjEyMDkzMjMyOGRhMDAwMCIsCiAgICAgICJjYW1wYWlnbl9zZW5kX3N0YXR1cyI6ICJTVUNDRVNTIiwKICAgICAgImNhbXBhaWduX3R5cGUiOiBudWxsLAogICAgICAidHJlYXRtZW50X2lkIjogIjAiLAogICAgICAiY2FtcGFpZ25faWQiOiAiZGU2MzdkOTNkMDNhNDkzYTlhMDdkZWZkNjQxNDAwMDAiCiAgICB9LAogICAgImNsaWVudF9jb250ZXh0IjogewogICAgICAiY3VzdG9tIjogewogICAgICAgICJlbmRwb2ludCI6ICJ7XCJDaGFubmVsVHlwZVwiOlwiR0NNXCIsXCJFbmRwb2ludFN0YXR1c1wiOlwiQUNUSVZFXCIsXCJPcHRPdXRcIjpcIk5PTkVcIixcIlJlcXVlc3RJZFwiOlwiMjUxNjYwNjAtNWYxOC00ZDQ3LWJiNTItNjE4NjFhMTEwMDAwXCIsXCJMb2NhdGlvblwiOntcIlBvc3RhbENvZGVcIjpcIlwiLFwiQ2l0eVwiOlwiXCIsXCJSZWdpb25cIjpcIlwiLFwiQ291bnRyeVwiOlwiS09SXCJ9LFwiRGVtb2dyYXBoaWNcIjp7XCJNYWtlXCI6XCJzYW1zdW5nXCIsXCJNb2RlbFwiOlwiU00tTjkzNUtcIixcIlRpbWV6b25lXCI6XCJBc2lhL1Nlb3VsXCIsXCJMb2NhbGVcIjpcImtvX0tSXCIsXCJBcHBWZXJzaW9uXCI6XCIxLjBcIixcIlBsYXRmb3JtXCI6XCJhbmRyb2lkXCIsXCJQbGF0Zm9ybVZlcnNpb25cIjpcIjlcIn0sXCJFZmZlY3RpdmVEYXRlXCI6XCIyMDE5LTEyLTAyVDA3OjI3OjM1LjYyNFpcIixcIkF0dHJpYnV0ZXNcIjp7fSxcIk1ldHJpY3NcIjp7fX0iCiAgICAgIH0KICAgIH0sCiAgICAiYXdzQWNjb3VudElkIjogIjMzMjM0NjUzMDAwMCIKICB9",
    "lambdaArn": "arn:aws:lambda:us-west-2:332346530000:function:FnPinpointCollectOnlyNotificationOpenEvent:$LATEST"
}

도대체 어떻게 해야 하는 것인가..

우선 Lambda로 들어오는 파라미터를 살펴보자.

event parameter


우선 Convert용 Lambda로 들어오는 Event Parameter의 생김새를 살펴보자.

{
  'invocationId': 'b803e137-c4e5-488c-b8f7-df0ad3bd0000',
  'deliveryStreamArn': 'arn:aws:firehose:us-west-2:332346530000:deliverystream/pinpoint_1127',
  'region': 'us-west-2',
  'records': [
    {
      'recordId': '49601810646734916533252110583961812944254908144625385474000000',
      'approximateArrivalTimestamp': 1575284052890,
      'data': 'ewogICJldmVudF90eXBlIjogIl9jYW1wYWlnbi5zZW5kIiwKICAiZXZlbnRfdGltZXN0YW1wIjogMTU3NTI4NDA1MTAxMywKICAiYXJyaXZhbF90aW1lc3RhbXAiOiAxNTc1Mjg0MDUxMTE4LAogICJldmVudF92ZXJzaW9uIjogIjMuMSIsCiAgImFwcGxpY2F0aW9uIjogewogICAgImFwcF9pZCI6ICI2ZGQ1Mjc1MGUxYjU0Mzk3YWQyNThjODgyN2VlMDAwMCIsCiAgICAic2RrIjogewogICAgICAKICAgIH0KICB9LAogICJjbGllbnQiOiB7CiAgICAiY2xpZW50X2lkIjogIjYyYTA4ZjNlLTNmNzgtNGJjYi04OGY5LTE0YWE0ZDc0MDAwMCIKICB9LAogICJkZXZpY2UiOiB7CiAgICAicGxhdGZvcm0iOiB7CiAgICAgIAogICAgfQogIH0sCiAgInNlc3Npb24iOiB7CiAgICAKICB9LAogICJhdHRyaWJ1dGVzIjogewogICAgImNhbXBhaWduX2FjdGl2aXR5X2lkIjogIjY2MzdjNGY2OWE5MzRkOTk5M2Y4NTNkYmQ0ZmQwMDAwIiwKICAgICJjYW1wYWlnbl9zZW5kX3N0YXR1cyI6ICJTVUNDRVNTIiwKICAgICJjYW1wYWlnbl90eXBlIjogbnVsbCwKICAgICJ0cmVhdG1lbnRfaWQiOiAiMCIsCiAgICAiY2FtcGFpZ25faWQiOiAiZGUxODhiMDU2NzQ4NDBhNjhkYzc4OWE1MDgwZTAwMDAiCiAgfSwKICAiY2xpZW50X2NvbnRleHQiOiB7CiAgICAiY3VzdG9tIjogewogICAgICAiZW5kcG9pbnQiOiAie1wiQ2hhbm5lbFR5cGVcIjpcIkdDTVwiLFwiRW5kcG9pbnRTdGF0dXNcIjpcIkFDVElWRVwiLFwiT3B0T3V0XCI6XCJOT05FXCIsXCJSZXF1ZXN0SWRcIjpcIjI1MTY2MDYwLTVmMTgtNGQ0Ny1iYjUyLTYxODYxYTExMDAwMFwiLFwiTG9jYXRpb25cIjp7XCJQb3N0YWxDb2RlXCI6XCJcIixcIkNpdHlcIjpcIlwiLFwiUmVnaW9uXCI6XCJcIixcIkNvdW50cnlcIjpcIktPUlwifSxcIkRlbW9ncmFwaGljXCI6e1wiTWFrZVwiOlwic2Ftc3VuZ1wiLFwiTW9kZWxcIjpcIlNNLU45MzVLXCIsXCJUaW1lem9uZVwiOlwiQXNpYS9TZW91bFwiLFwiTG9jYWxlXCI6XCJrb19LUlwiLFwiQXBwVmVyc2lvblwiOlwiMS4wXCIsXCJQbGF0Zm9ybVwiOlwiYW5kcm9pZFwiLFwiUGxhdGZvcm1WZXJzaW9uXCI6XCI5XCJ9LFwiRWZmZWN0aXZlRGF0ZVwiOlwiMjAxOS0xMi0wMlQwNzoyNzozNS42MjRaXCIsXCJBdHRyaWJ1dGVzXCI6e30sXCJNZXRyaWNzXCI6e319IgogICAgfQogIH0sCiAgImF3c0FjY291bnRJZCI6ICIzMzIzNDY1MzAwMDAiCn0='
    },
    {
      'recordId': '49601810646734916533252110606382551194827824598297346050000000',
      'approximateArrivalTimestamp': 1575284107969,
      'data': 'ewogICAgImV2ZW50X3R5cGUiOiAiX2NhbXBhaWduLnJlY2VpdmVkX2JhY2tncm91bmQiLAogICAgImV2ZW50X3RpbWVzdGFtcCI6IDE1NzUyODQwNTA3NjcsCiAgICAiYXJyaXZhbF90aW1lc3RhbXAiOiAxNTc1Mjg0MDUyNjA3LAogICAgImV2ZW50X3ZlcnNpb24iOiAiMy4xIiwKICAgICJhcHBsaWNhdGlvbiI6IHsKICAgICAgImFwcF9pZCI6ICI2ZGQ1Mjc1MGUxYjU0Mzk3YWQyNThjODgyN2VlMDAwMCIsCiAgICAgICJjb2duaXRvX2lkZW50aXR5X3Bvb2xfaWQiOiAidXMtd2VzdC0yOjQ3ZTdmMTM5LWI4M2EtNDU0NC04NDA5LTQyZTIzNTczMDAwMCIsCiAgICAgICJwYWNrYWdlX25hbWUiOiAia3IuamhiLmF3c19waW5wb2ludF9hbmRyb2lkIiwKICAgICAgInNkayI6IHsKICAgICAgICAibmFtZSI6ICJhd3Mtc2RrLWFuZHJvaWQiLAogICAgICAgICJ2ZXJzaW9uIjogIjIuMTUuMiIKICAgICAgfSwKICAgICAgInRpdGxlIjogImF3c19waW5wb2ludF9hbmRyb2lkIiwKICAgICAgInZlcnNpb25fbmFtZSI6ICIxLjAiLAogICAgICAidmVyc2lvbl9jb2RlIjogIjEiCiAgICB9LAogICAgImNsaWVudCI6IHsKICAgICAgImNsaWVudF9pZCI6ICI2MmEwOGYzZS0zZjc4LTRiY2ItODhmOS0xNGFhNGQ3NDAwMDAiLAogICAgICAiY29nbml0b19pZCI6ICJ1cy13ZXN0LTI6ZGRkNjQ0ODMtZjMwMS00YjVmLTg5NzEtOTdkOTNjMzkwMDAwIgogICAgfSwKICAgICJkZXZpY2UiOiB7CiAgICAgICJsb2NhbGUiOiB7CiAgICAgICAgImNvZGUiOiAia29fS1IiLAogICAgICAgICJjb3VudHJ5IjogIktSIiwKICAgICAgICAibGFuZ3VhZ2UiOiAia28iCiAgICAgIH0sCiAgICAgICJtYWtlIjogInNhbXN1bmciLAogICAgICAibW9kZWwiOiAiU00tTjkzNUsiLAogICAgICAicGxhdGZvcm0iOiB7CiAgICAgICAgIm5hbWUiOiAiYW5kcm9pZCIsCiAgICAgICAgInZlcnNpb24iOiAiOSIKICAgICAgfQogICAgfSwKICAgICJzZXNzaW9uIjogewogICAgICAic2Vzc2lvbl9pZCI6ICIwMDAwMDAwMC0wMDAwMDAwMCIsCiAgICAgICJzdGFydF90aW1lc3RhbXAiOiAwCiAgICB9LAogICAgImF0dHJpYnV0ZXMiOiB7CiAgICAgICJjYW1wYWlnbl9hY3Rpdml0eV9pZCI6ICI2NjM3YzRmNjlhOTM0ZDk5OTNmODUzZGJkNGZkMDAwMCIsCiAgICAgICJpc0FwcEluRm9yZWdyb3VuZCI6ICJmYWxzZSIsCiAgICAgICJ0cmVhdG1lbnRfaWQiOiAiMCIsCiAgICAgICJjYW1wYWlnbl9pZCI6ICJkZTE4OGIwNTY3NDg0MGE2OGRjNzg5YTUwODBlMDAwMCIKICAgIH0sCiAgICAiZW5kcG9pbnQiOiB7CiAgICAgICJDaGFubmVsVHlwZSI6ICJHQ00iLAogICAgICAiQWRkcmVzcyI6ICI9QUJBUThyZm1GT2VaVExDZ1JTYjdpQ3NmTjBrV1FPbE5XNmM1N0xGUVA2K0RaUWV3L1p6eGxjR2VsSTMwZkJqdHZ6MTUyc1BvZ3NEME9uSGxocmZMaXpodXBrN3JyTEVDOEtvZTFyZlpiWmsyd1lOdnQ4d3VyUVNQelJsYkVtY0F1WHZsQzZSQnRyTmI3TlRVUUdIamxIUXUrancrY2gxN2YvdXByRy9IZVNPUERDQ3lRVGI2U2NBU2FEWmtnR2xsYXA5VUdKcHhibmJWcCt3VTlOUll6c3gvRFRFaVF3Qzh4NFp2MjdTbVZVa0xJK1ZxOEYwTEVGSWlMNWt3OGQ1amJTbk1wM3Mrd1dPV003MGRxZURRcTEwMDAwPT0iLAogICAgICAiRW5kcG9pbnRTdGF0dXMiOiAiQUNUSVZFIiwKICAgICAgIk9wdE91dCI6ICJOT05FIiwKICAgICAgIlJlcXVlc3RJZCI6ICIyNTE2NjA2MC01ZjE4LTRkNDctYmI1Mi02MTg2MWExMTAwMDAiLAogICAgICAiTG9jYXRpb24iOiB7CiAgICAgICAgIlBvc3RhbENvZGUiOiAiIiwKICAgICAgICAiQ2l0eSI6ICIiLAogICAgICAgICJSZWdpb24iOiAiIiwKICAgICAgICAiQ291bnRyeSI6ICJLT1IiCiAgICAgIH0sCiAgICAgICJEZW1vZ3JhcGhpYyI6IHsKICAgICAgICAiTWFrZSI6ICJzYW1zdW5nIiwKICAgICAgICAiTW9kZWwiOiAiU00tTjkzNUsiLAogICAgICAgICJUaW1lem9uZSI6ICJBc2lhL1Nlb3VsIiwKICAgICAgICAiTG9jYWxlIjogImtvX0tSIiwKICAgICAgICAiQXBwVmVyc2lvbiI6ICIxLjAiLAogICAgICAgICJQbGF0Zm9ybSI6ICJhbmRyb2lkIiwKICAgICAgICAiUGxhdGZvcm1WZXJzaW9uIjogIjkiCiAgICAgIH0sCiAgICAgICJFZmZlY3RpdmVEYXRlIjogIjIwMTktMTItMDJUMDc6Mjc6MzUuNjI0WiIsCiAgICAgICJBdHRyaWJ1dGVzIjogewogICAgICAgIAogICAgICB9LAogICAgICAiTWV0cmljcyI6IHsKICAgICAgICAKICAgICAgfSwKICAgICAgIkFwcGxpY2F0aW9uSWQiOiAiNmRkNTI3NTBlMWI1NDM5N2FkMjU4Yzg4MjdlZTAwMDAiLAogICAgICAiSWQiOiAiNjJhMDhmM2UtM2Y3OC00YmNiLTg4ZjktMTRhYTRkNzQwMDAwIiwKICAgICAgIkNvaG9ydElkIjogIjY1IiwKICAgICAgIkNyZWF0aW9uRGF0ZSI6ICIyMDE5LTExLTI3VDA5OjQxOjA0LjExN1oiCiAgICB9LAogICAgImF3c0FjY291bnRJZCI6ICIzMzIzNDY1MzAwMDAiCiAgfQ=='
    }
  ]
}

InvocationId

Convert를 위한 Id인듯

deliveryStreamArn

어느 Firehose껀지

region

말 그대로 Region

records

제일 중요. 결국 이 데이터를 파싱해야 한다. 아래에서 자세히 다루겠음

record


들어오는 record (event['records']의 item)은 아래와 같이 생겼다. Property가 딱 3개 있다.

{
    'recordId': '49601810646734916533252110403464352372512285256992161794000000',
    'approximateArrivalTimestamp': 1575283637168,
    'data': 'ewogICJldmVudF90eXBlIjogIl9jYW1wYWlnbi5zZW5kIiwKICAiZXZlbnRfdGltZXN0YW1wIjogMTU3NTI4NDA1MTAxMywKICAiYXJyaXZhbF90aW1lc3RhbXAiOiAxNTc1Mjg0MDUxMTE4LAogICJldmVudF92ZXJzaW9uIjogIjMuMSIsCiAgImFwcGxpY2F0aW9uIjogewogICAgImFwcF9pZCI6ICI2ZGQ1Mjc1MGUxYjU0Mzk3YWQyNThjODgyN2VlMDAwMCIsCiAgICAic2RrIjogewogICAgICAKICAgIH0KICB9LAogICJjbGllbnQiOiB7CiAgICAiY2xpZW50X2lkIjogIjYyYTA4ZjNlLTNmNzgtNGJjYi04OGY5LTE0YWE0ZDc0MDAwMCIKICB9LAogICJkZXZpY2UiOiB7CiAgICAicGxhdGZvcm0iOiB7CiAgICAgIAogICAgfQogIH0sCiAgInNlc3Npb24iOiB7CiAgICAKICB9LAogICJhdHRyaWJ1dGVzIjogewogICAgImNhbXBhaWduX2FjdGl2aXR5X2lkIjogIjY2MzdjNGY2OWE5MzRkOTk5M2Y4NTNkYmQ0ZmQwMDAwIiwKICAgICJjYW1wYWlnbl9zZW5kX3N0YXR1cyI6ICJTVUNDRVNTIiwKICAgICJjYW1wYWlnbl90eXBlIjogbnVsbCwKICAgICJ0cmVhdG1lbnRfaWQiOiAiMCIsCiAgICAiY2FtcGFpZ25faWQiOiAiZGUxODhiMDU2NzQ4NDBhNjhkYzc4OWE1MDgwZTAwMDAiCiAgfSwKICAiY2xpZW50X2NvbnRleHQiOiB7CiAgICAiY3VzdG9tIjogewogICAgICAiZW5kcG9pbnQiOiAie1wiQ2hhbm5lbFR5cGVcIjpcIkdDTVwiLFwiRW5kcG9pbnRTdGF0dXNcIjpcIkFDVElWRVwiLFwiT3B0T3V0XCI6XCJOT05FXCIsXCJSZXF1ZXN0SWRcIjpcIjI1MTY2MDYwLTVmMTgtNGQ0Ny1iYjUyLTYxODYxYTExMDAwMFwiLFwiTG9jYXRpb25cIjp7XCJQb3N0YWxDb2RlXCI6XCJcIixcIkNpdHlcIjpcIlwiLFwiUmVnaW9uXCI6XCJcIixcIkNvdW50cnlcIjpcIktPUlwifSxcIkRlbW9ncmFwaGljXCI6e1wiTWFrZVwiOlwic2Ftc3VuZ1wiLFwiTW9kZWxcIjpcIlNNLU45MzVLXCIsXCJUaW1lem9uZVwiOlwiQXNpYS9TZW91bFwiLFwiTG9jYWxlXCI6XCJrb19LUlwiLFwiQXBwVmVyc2lvblwiOlwiMS4wXCIsXCJQbGF0Zm9ybVwiOlwiYW5kcm9pZFwiLFwiUGxhdGZvcm1WZXJzaW9uXCI6XCI5XCJ9LFwiRWZmZWN0aXZlRGF0ZVwiOlwiMjAxOS0xMi0wMlQwNzoyNzozNS42MjRaXCIsXCJBdHRyaWJ1dGVzXCI6e30sXCJNZXRyaWNzXCI6e319IgogICAgfQogIH0sCiAgImF3c0FjY291bnRJZCI6ICIzMzIzNDY1MzAwMDAiCn0='
}

recordId

각 record의 Id.. 이걸 Output Record에도 그대로 넣어줘야 한다.

approximateArrivalTimestamp

도착한 시간

data

base64로 인코딩된 페이로드. Stream Event가 담겨있다.

결국 이 값을 base64 디코딩 후 원하는 값만 추출하여 Output에 담아주어야 한다.

Output


리턴은 어떻게 하는가?

'records' JSON Array에 각각의 record를 담아서 보내면 된다.

중요한 것은 이때 세가지 규칙을 지켜야 한다는 것이다.

https://docs.aws.amazon.com/ko_kr/firehose/latest/dev/data-transformation.html 를 읽어보자...

recordId

전달받은 recordId를 그대로 쓰면 된다.

result

Ok, Dropped, ProcessingFailed 중 하나의 값을 넣어야 함

OkDropped의 경우 Kinesis Kirehose는 변환을 성공적으로 수행했다고 판단하고 그 외의 값이 들어가면 실패했다고 판단한다.

의도적으로 Record를 제외 시키려면 Dropped로 넣으면 레코드가 빠진다.

data

이놈이 나를 고생시켰다.

도대체 어떤 값을 넣어야 하는가? 여기에 자꾸 적절치 않은 값을 넣었더니 Invalid output structure 라면서 Fail이 떴던 것..

결론부터 말하면 Base64 인코딩된 UTF-8 String 객체를 넣어 줘야 한다.

Python에서 Base64로 인코딩/디코딩 하려면 우선 Byte형 객체를 써줘야 하는 점에 주의 하자.

관련 링크는 이곳 참고

이렇게 위의 3 Property를 담고 있는 JSON Array를 만든 후 records라는 key를 붙여서 return 하면 Converting이 끝난다.

[
    {
        'name': 'jakemraz',
        'age': 13
    },
    {
        'name': 'beatles',
        'age': 14
    }
]

위 형태의 JSON로 변환하여 저장하고 싶다면 최종적으로 아래와 같이 만든 후 리턴하면 된다.

'records': [
    {
        'recordId': 'blahblah',
        'result': 'Ok',
        'data': 'ewonbmFtZSc6ICdqYWtlbXJheicsCidhZ2UnOiAxMwp9' // base64 인코딩
    },
    {
        'recordId': 'blahblah',
        'result': 'Ok',
        'data': 'ewogICduYW1lJzogJ2JlYXRsZXMnLAogICdhZ2UnOiAxNAp9'
    }
]

결론


Input으로 들어오는 Record는 Base64 Decoding하여 필요한 변환을 수행 한 후

Base64 Encoding한 Record의 Array를 생성하여 records로 리턴 하면 된다.

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2024/03   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31
글 보관함