NIRVANA

[SnowFlake] S3버킷 연동 및 Json 데이터를 관계형 테이블에 로드 본문

DataEngineering

[SnowFlake] S3버킷 연동 및 Json 데이터를 관계형 테이블에 로드

녜잉 2024. 12. 26. 08:22

🪣S3버킷과 SnowFlake 연동하기 

🪣S3 버킷 생성 + 데이터 업로드

아래와 같이 간단하게 yr-s3-project라는 이름을 갖는 bucket을 생성하였다. 

 

S3 > 생성한 버킷 > 권한 > 버킷 정책에서 편집을 눌러 버킷에 대한 정책을 설정해준다.

  • select type of policy를 s3 bucket policy로 설정
  • effect를 allow로 설정
  • principal *로 설정
  • 버킷에 대해 허용할 action으로 putObject하나만 설정
  • arn을 복사하여 붙인 후, /* 을 덧붙임 

add Statement로 상태를 추가하고 Generate Policy를 선택하여 정책을 생성한 뒤, 생성된 정책을 버킷 정책에 붙여 넣는다. 사실 뭔가 필요 없는 것 같은데 공부할 겸 해봄 ㅋ

 

생성한 버킷에 파이썬을 사용하여 json 데이터를 업로드하였다. 사용한 데이터는 영화진흥위원회의 일별 박스오피스 데이터이다. 

https://www.kobis.or.kr/kobisopenapi/homepg/apiservice/searchServiceInfo.do

 

영화진흥위원회 오픈API

제공서비스 영화관입장권통합전산망이 제공하는 오픈API서비스 모음입니다. 사용 가능한 서비스를 확인하고 서비스별 인터페이스 정보를 조회합니다.

www.kobis.or.kr

 

import boto3
from botocore.exceptions import NoCredentialsError

import os
from dotenv import load_dotenv

load_dotenv()

AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")


def upload_to_s3(file_name, bucket_name, object_name=None):
    
    #객체 이름이 없으면 파일 이름 사용
    if object_name is None:
        object_name = f"box_office/{file_name}"
    
    
    s3_client = boto3.client(
        's3',
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key = AWS_SECRET_ACCESS_KEY
    )
    
    try:
        s3_client.upload_file(file_name, bucket_name, object_name)
        print("success to upload file in s3 bucket")
    
    except NoCredentialsError:
        print("can't find aws credential")
    
    except Exception as e:
        print(f"fail to upload file in s3 bucket: {e}")
        

if __name__ == "__main__":
    
    file_name = "test/movie_data.json" #파일 이름(경로)
    bucket_name = "yr-s3-project" #s3 버킷 이름 
    object_name = "box_office/movie_data_upload.json" #s3에 저장될 이름 
    
    upload_to_s3(file_name, bucket_name, object_name)

 

잘 업로드 된 것을 확인할 수 있다. 

 

❄️SnowFlake 계정 생성

SnowFlake 계정이 없기 때문에 SnowFlake 계정부터 먼저 생성하였다. 

간단한 기본 정보를 작성하면 30일 동안 400 크레딧의 한도 내에서 SnowFlake를 무료로 사용해볼 수 있다. 

Enterprise 버전을 가장 많이 사용한다고 하여 해당 버전을 선택하고, 사용 클라우드는 AWS를 선택하였다. 

 

이메일 확인 메일을 클릭하면 사용자 이름 - 비밀번호를 등록하라고 나온 후에 다음과 같은 화면이 나온다

 

나는 S3 버킷 안에 있는 데이터를 가지고 와야 하므로 Load data into Snowflake를 선택했다. 그러면 뭔가 SQL 쿼리를 보낼 수 있을 것 같은 곳이 나오게 된다. 

 

 


🪣S3 버킷에 대한 액세스 권한 구성 

1. IAM 정책 생성 

SnowFlake가 폴더 및 하위 폴더의 파일에 액세스 하기 위해 필요한 S3 버킷 및 폴더 권한은 다음과 같다. 

  • s3:GetBucketLocation
  • s3:GetObject
  • s3:GetObjectVersion
  •  s3:ListBucket

따라서 위와 같은 권한을 가진 IAM을 만들어줘야 한다.  IAM > 정책 > 정책 생성으로 들어가 아래와 같은 IAM 정책을 생성한다.   <bucket>/<prefix>/ 경로의 모든 파일에 대해 권한을 준다는 의미이다. 

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:GetObjectVersion",
                "s3:DeleteObject",
                "s3:DeleteObjectVersion"
            ],
            "Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::<bucket>", 
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "<prefix>/*"
                    ]
                }
            }
        }
    ]
}
  • <bucket> : 권한을 가질 S3 bucket
  • <prefix> : 권한을 가질 S3 오브젝트의 경로 

 

위와 같이 정책을 생성하였다. 

 

2. IAM 역할 생성 

신뢰할 수 있는 엔터티 유형으로 AWS 계정을 선택한다. 이후, 옵션에서 외부 ID 필요의 체크 박스를 클릭하고 외부 ID를 0000으로 설정한다(추후 Integration시 변경 예정) 

 

앞에서 만들어두었던 정책을 추가한다. 

 

다음과 같이 이름을 지정한 뒤 역할을 생성한다. 

 

 

3. Integration 구성 (❄️SnowFlake에서 진행)

아래의 쿼리를 사용하여 Integraion을 구성한다.

  • integration_name : 생성할 Integration 이름
  • iam_role : IAM role의 ARN
  • bucket : 접근할 버킷명
  • path: 접근할 s3 경로
CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<iam_role>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/')

 

실행 버튼을 누르면 다음과 같이 성공적으로 Integration이 생성된다.  

DESC INTEGRATION s3_snowflake_project;

 

로 생성된 테이블의 스키마를 확인해보면  아래와 같이 나오게 된다. STORAGE_AWS_IAM_USER_ARN, STORAGE_AWS_EXTERNAL_ID 을 사용해야 하므로 해당 property의 value를 기억해둔다. 

 

4. IAM 사용자에게 버킷 액세스 권한 부여

IAM 역할의 상세 페이지의 신뢰 관에서 다음과 같이 신뢰 정책을 수정한다.

  • snowflake_user_arn: STORAGE_AWS_IAM_USER_ARN
  • snowflkae-external_id: STORAGE_AWS_EXTERNAL_ID

STORAGE_AWS_EXTERNAL_ID의 값은 생성한 Integration을 재생성하면 값이 바뀌게 되므로 Integration을 재생성하면 IAM 역할의 신뢰 정책 역시 업데이트 해줘야 한다. 

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<snowflake_user_arn>"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "<snowflake_external_id>"
        }
      }
    }
  ]
}

 

 

이제 외부 스테이지만 생성하면 S3에 있는 데이터를 가지고 와서 사용할 수 있다! 

 


☃️S3 Json 데이터를 SnowFlake의 관계형 테이블에 로드 

1. 데이터 베이스 생성  및 임시 테이블 생성 

CREATE 문을 사용하여 데이터 베이스를 만든다. 이때, 생성된 데이터베이스에는 public이라는 스키마가 자동으로 포함된다. 

CREATE OR REPLACE DATABASE box_office_data;

 

 

CREATE문을 사용하여 JSON 대상의 테이블을 생성한다. 해당 테이블은 임시 테이블이므로 사용자 세션 기간에만 지속, 다른 사용자에게는 보이지 않는다.

  • VARIANT는 SnowFlake에서 지원하는 반정형 데이터 타입으로 JSON 등의 반정형 데이터를 로딩하고 작동하기 위해 사용된다. 
CREATE OR REPLACE TEMPORARY TABLE my_box_office(json_data VARIANT);

 

 

 

2. 파일 형식 오브젝트 만들기

S3 버킷에서 테이블로 데이터 파일을 로드할 때, 파일의 형식을 설명하고 파일의 데이터를 해석하고 처리하는 방법을 지정해야 한다. 

  • CREATE FILE FORMAT 명령을 실행하여 myjsonfromat 파일 형식을 생성한다. 
    • TYPE = ‘JSON’ : 원본 파일 형식 유형을 나타냄
    • STRIP_OUTER_ARRAY = TRUE : COPY 명령에 대해 데이터를 테이블로 로드할 때 대괄호 [] 를 제외하도록 지시
CREATE OR REPLACE FILE FORMAT bojsonformat
    TYPE = 'JSON'
    STRIP_OUTER_ARRAY = TRUE;

 

Json 데이터의 경우 STRIP_OUTER_ARRAY = TRUE 가 지정되지 않은 상태로 데이터를 읽어들일 경우, 파싱에서 에러가 나므로 주의해야 한다 (이걸로 삽질한 사람한 나야나..)

 

 

3. 스테이지 오브젝트 만들기

스테이지는 파일의 데이터를 테이블에 로드할 수 있도록 데이터 파일이 저장되는 위치, 즉 스테이징 되는 위치를 지정한다.

  • 이때, 명명된 외부 스테이지는 Snowflake에서 관리하는 클라우드 저장소 위치를 의미 → 우리는 S3
  • 외부 스테이지는 S3 버킷에 저장된 데이터 파일을 참조해서 가져오게 됨(아마도?)
CREATE STAGE <stage_name>
  STORAGE_INTEGRATION = <integration_name>
  URL = 's3://<bucket>/<prefix>/'
  FILE_FORMAT = <file_format_name>;
  • stage_name: 스테이지 이름 
  • integration_name: 생성한 Integration 이름
  • file_format_name: 생성한 파일 포맷 이름 

 

4. 데이터를 대상 테이블로 복사

COPY INTO 명령어를 사용하여 Json 데이터를 임시 테이블로 로드한다. 

  • temp_table_name: 앞에서 만든 임시 테이블 이름
  • stage_name: 앞에서 만든 외부 스테이지 이름
  • file_name: s3에 있는 데이터 파일 이름(prefix넣었으므로 폴더 안 넣어도 됨!) 
  • ON_ERROR = 'skip_file' : 오류 나면 파일 건너 뛰어라 
COPY INTO <temp_table_name>
	FROM @<stage_name>/<file_name> 
	ON_ERROR = 'skip_file';

 

테이블을 조회하면 다음과 같이 데이터가 잘 로드 되었음을 확인할 수 있다. 

 

5. RDB 테이블 생성하기 

RDB를 생성하기 이전에 원본 json 데이터를 확인해봐야 한다. 원본 json 데이터의 경우 아래와 같은 형식으로 되어 있다.

 

즉, boxOfficeResult라는 객체 안에 boxofficeType, showRange, dailyBoxOfficeList가 있고, dailyBoxOfficeList의 value인 리스트 안에 다시 영화 정보가 담긴 객체가 나열 되어 있는 형식이다. 

 

해당 json을 관계형 테이블에 담기 위해서는 관계형 테이블을 적절하게 나누는 방식이 필요한데 일단은 제일 간단하게 테이블을 박스 오피스 정보 테이블영화 정보 테이블로 나누었다. PK를 어떤걸로 지정하는 게 좋을까 고민하다가 여기서는 클로드의 도움을 받았다 ㅎ 

 

-- 박스오피스 테이블 
CREATE TABLE BOX_OFFICE_INFO (
    report_date DATE,  -- showRange에서 추출
    boxoffice_type VARCHAR(50),
    show_range VARCHAR(20),
    PRIMARY KEY (report_date)
);

-- 영화 정보 테이블 
CREATE OR REPLACE TABLE DAILY_BOX_OFFICE (
    report_date DATE,
    rnum INTEGER,
    rank INTEGER,
    rank_inten INTEGER,
    rank_old_and_new VARCHAR(3),
    movie_cd VARCHAR(10),
    movie_nm VARCHAR(200),
    open_dt DATE,
    sales_amt DECIMAL(15,0),
    sales_share DECIMAL(15,1),
    sales_inten DECIMAL(15,0),
    sales_change DECIMAL(15,1),
    sales_acc DECIMAL(15,0),
    audi_cnt INTEGER,
    audi_inten INTEGER,
    audi_change DECIMAL(15,1),
    audi_acc INTEGER,
    scrn_cnt INTEGER,
    show_cnt INTEGER,
    PRIMARY KEY (report_date, movie_cd),
    FOREIGN KEY (report_date) REFERENCES BOX_OFFICE_INFO(report_date)
);

 

박스 오피스 테이블의 경우 showRange에서 추출한 report_datePK가 되고, 영화 정보 테이블report_date과 movie_cd를 합친 복합키PK로 사용하게 된다. 또한 영화 정보 테이블은 report_date을 FK로 부모 테이블인 박스 오피스 테이블을 참조한다. 

 

 

6. 임시 테이블에서 RDB 테이블로 데이터 삽입 

임시 테이블에서 데이터를 선택하여 생성한 RDB 테이블에 삽입한다. 

  • my_box_office 테이블의 json_data열에서 boxOfficeResult.showRange 값을 추출한 후, 앞의 1~8자리 까지만 잘라 낸 후(substring 함수) 이를 날짜 형식으로 변환
  • my_box_office 테이블의 json_data열에서 boxOfficeResult.boxofficeType값을 추출한 후, 문자열로 변환
  • my_box_office 테이블의 json_data열에서 boxOfficeResult.showRange값을 추출한 후, 문자열로 변환 
--variant 컬럼에서 테이블로 insert
INSERT INTO BOX_OFFICE_INFO (report_date, boxoffice_type, show_range)
SELECT 
    TO_DATE(SUBSTRING(json_data:boxOfficeResult.showRange, 1, 8), 'YYYYMMDD'),
    json_data:boxOfficeResult.boxofficeType::STRING,
    json_data:boxOfficeResult.showRange::STRING
FROM my_box_office;

 

SELECT 문을 사용해서 조회해보면 다음과 같이 데이터가 잘 넣어진 것을 확인할 수 있다. 

 

 

영화 정보 테이블 역시 같은 방법으로 진행한다.  my_box_office의 각 행에 대해 json_data 컬럼에서 dailyBoxOfficeList를 추출하고, 추출된 JSON 배열을 행 단위로 펼친다. 만약 하나의 JSON 배열에 10개의 객체가 있다면 각 개체가 개별 행으로 처리 될 수 있게 한다. 

  • LATERAL : FLATTEN 작업이 각 행의 JSON 데이터에 대해 실행되도록 설정
    • 현재 행의 데이터를 기반으로 동작하는 서브쿼리나 함수에서 사용
  • FLATTEN(input => ...) : 배열을 펼쳐서 각 요소를 개별 행으로 만듦
INSERT INTO DAILY_BOX_OFFICE
SELECT 
    TO_DATE(SUBSTRING(json_data:boxOfficeResult.showRange, 1, 8), 'YYYYMMDD'),
    d.value:rnum::INTEGER,
    d.value:rank::INTEGER,
    d.value:rankInten::INTEGER,
    d.value:rankOldAndNew::STRING,
    d.value:movieCd::STRING,
    d.value:movieNm::STRING,
    TO_DATE(d.value:openDt::STRING, 'YYYY-MM-DD'),
    d.value:salesAmt::DECIMAL(15,0),
    d.value:salesShare::DECIMAL(15,1),
    d.value:salesInten::DECIMAL(15,0),
    d.value:salesChange::DECIMAL(15,1),
    d.value:salesAcc::DECIMAL(15,0),
    d.value:audiCnt::INTEGER,
    d.value:audiInten::INTEGER,
    d.value:audiChange::DECIMAL(15,1),
    d.value:audiAcc::INTEGER,
    d.value:scrnCnt::INTEGER,
    d.value:showCnt::INTEGER
FROM my_box_office,
LATERAL FLATTEN(input => json_data:boxOfficeResult.dailyBoxOfficeList) d;

 

역시나 SELECT 문으로 조회해보면 잘 삽입 된 것을 확인할 수 있다. 

 

 

 


나에게 주어진 과제를 끝냈으니까..

이제 진짜 SnowFlake 공부해야겠다 .. 

 

 

참고:

https://docs.snowflake.com/ko/user-guide/data-load-external-tutorial-prerequisites

 

전제 조건 | Snowflake Documentation

Snowflake는 공용 S3 버킷에서 스테이징된 샘플 데이터 파일을 제공합니다. 참고 일반적인 사용에서는 AWS 관리 콘솔, AWS 명령줄 인터페이스 또는 이에 상응하는 클라이언트 애플리케이션을 사용하

docs.snowflake.com

https://velog.io/@ujeongoh/Amazon-S3%EC%97%90-%EC%95%A1%EC%84%B8%EC%8A%A4%ED%95%98%EB%8F%84%EB%A1%9D-Snowflake-%EC%84%A4%EC%A0%95%ED%95%98%EA%B8%B0

 

Amazon S3에 액세스하도록 Snowflake 설정하기

설정을 하기에 앞서 Snowflake의 Integration에 대해 알아보자. 일반적으로 Snowflake에서 데이터를 가져오거나 클라우드 서비스(예: S3)와 통합하기 위해 인증 정보를 사용해야 할 때, 해당 클라우드 공

velog.io