NIRVANA
[Apache Hadoop] 하둡(Hadoop)이란? 본문
하둡(Hadoop)이란?
- High-Availability Distributed Object-Oriented Platform
- 빅데이터를 저장, 처리, 분석할 수 있는 자바 소프트웨어 프레임워크
- 대용량의 데이터를 클러스터에서 병렬로 동시에 처리하여 처리 속도를 높이는 것을 목적으로 하는 분산처리를 위한 오픈 소스 프레임워크
하둡 개발 배경
- 야후(Yahoo!)의 더그 커팅이 검색 엔진을 개발하는 과정에서 대용량의 비정형 데이터를 기존의 RDB 기술로 처리하는 데에 한계를 발견
- 구글에서 발표한 GFS와 MapReduce 관련 논문을 참고하여 개발
- 이후 Apache 재단으로 프로젝트가 넘어가 오픈 소스로 공개됨
💡구글 GFS와 MapReduce
GFS(Google File System, 구글 파일 시스템)
- 구글 파일 시스템은 급속히 늘어나는 구글의 데이터 처리를 위해 설계된 대용량 분산 파일 시스템
구글 파일 시스템 구성
- 마스터(Master) - GFS 전체의 상태를 관리하고 통제하는 중앙 서버
- 청크 서버(Chunk Server) - 물리적인 하드 디스크에 실제 입출력 처리
- 클라이언트(Client) - 파일을 읽고 쓰는 동작을 요청하는 어플리케이션
구글 파일 시스템 동작 과정
- 클라이언트가 마스터에게 파일의 읽기/쓰기를 요청
- 요청을 받은 마스터가 클라이언트와 가장 가까운 청크 서버의 정보를 클라이언트에게 전달
- 클라이언트는 전달 받은 정보를 바탕으로 청크 서버와 직접 통신하며 파일의 읽기/쓰기를 실행
MapReduce
- 여러 노드에 태스크를 분배하는 방법
- 하나의 큰 작업을 작은 작업으로 분할(Map)하고 작업 결과를 합치는(Reduce) 방법
하둡 구성 요소
하둡 분산형 파일 시스템(Hadoop Distributed File System, HDFS)
- 하둡 네트워크에 연결된 기기의 데이터를 저장하는 분산형 파일 시스템
- 여러 서버에 대용량 파일을 나누어서 저장
- 서버에 데이터를 중복 저장함으로써 데이터 안전성을 얻음
특징
- HDFS는 데이터를 저장하면 다수의 노드에 복제 데이터도 함께 저장하여 데이터 유실을 방지
- HDFS에 파일을 저장하거나 저장된 파일을 조회하려면 스트리밍 방식으로 데이터에 접근해야 함
- 한번 저장한 데이터는 수정할 수 없고, 읽기만 가능
- 데이터 수정은 불가능하지만 파일 이동, 삭제, 복사할 수 있는 인터페이스 제공
HDFS 구성
- HDFS는 마스터/슬레이브 구조를 가짐
- HDFS에서 마스터 역할 → NameNode 서버 1대 HDFS에서 슬레이브 역할 → DataNode 서버 여러 대로 구성
- Block 구조의 파일 시스템으로 저장하는 파일은 특정 사이즈의 Block으로 나누어져 분산된 서버에 저장
- 하나의 Block은 3개로 복제되고, 각각 다른 HDFS의 노드에 분산 저장됨
- 하둡 어플리케이션은 HDFS에 파일을 저장하거나, 저장된 파일을 읽기 위해 HDFS 클라이어트를 사용하며 클라이언트는 API 형태로 사용자에게 제공
- 클라이언트는 NameNode에 접속해서 원하는 파일이 저장된 Block의 위치를 확인하고, 해당 블록이 저장된 DataNode에서 직접 데이터를 조회
HDFS Cluster
- 하나의 NameNode & FileSystem을 관리
- 클라이언트의 접근을 통제하는 Master Server로 구성
- 클러스터의 각 노드들에는 DataNode가 하나씩 존재하고, DataNode는 실행될 때 마다 Node에 추가되는 스토리지를 관리
- HDFS는 네임스페이스를 공개, 유저 데이터가 파일에 저장되는 것을 허용
NameNode
- HDFS의 모든 메타 데이터 관리
- 메타 데이터
- 파일명
- 디렉터리
- 데이터 블록 크기
- 소유자: 소속 그룹
- 파일 속성
- DataNode와 블록 대응 정보
- 블록 ID와 해당 블록을 보유한 DataNode 정보
- 메타 데이터
- 파일과 디렉토리의 읽기(open), 닫기(close), 이름 바꾸기(rename) 등 파일 시스템의 네임 스페이스의 여러 기능을 수행
- DataNode와 Block들의 매핑 결정
DataNode
- 주기적으로 NameNode에서 블록 리포트(노드에 저장되어 있는 블록의 정보)를 전송
→ 이를 통해 NameNode는 DataNode가 정상 동작하는지 확인 - 파일 시스템의 클라이언트가 요구하는 읽기, 쓰기 기능을 수행
→ DataNode는 NameNode에서의 생성, 삭제, 복제 등과 같은 기능도 수행
맵리듀스(MapReduce)
- 대용량의 데이터 처리를 위한 분산 프로그래밍 모델이자 소프트웨어 프레임워크
- 맵 리듀스 프레임워크를 통해 대규모 분산 컴퓨팅 환경에서 대량의 데이터를 병렬로 분석 가능
- 맵(Map)과 리듀스(Reduce) 두 개의 메서드로 구성
- 흩어져 있는 데이터를 수직화하여 데이터를 각각의 종류별로 모으고(Map) 필터링과 sorting을 거쳐 데이터를 뽑아내는(Reduce) 분산처리 기술과 관련 프레임워크를 의미
MapReduce Workflow
- Input 데이터 입력
- 단어의 개수를 세기 위한 텍스트 파일들을 HDFS에 업로드, 각각의 파일은 블록 단위로 나누어 저장
- Splitting 입력된 데이터를 Input Split 단위로 분할, 이때 Input Split의 크기는 일반적으로 HDFS의 블록 크기와 동일
- 큰 데이터가 들어왔을 때, 64MB(하둡 2.0부터는 128MB) 단위 블록으로 분할
- 분할된 모든 블록은 같은 Map 작업을 수행하고 이후 Reduce 과정을 통해 합쳐짐
- MappingInput Split의 데이터를 레코드 단위로 한 줄씩 읽어서 사용자가 정의한 Map Function을 적용
- Map 연산을 통해 (단어, 1)의 리스트를 반환
- Map : 데이터를 (key, Value) 쌍의 형태로 연관성 있는 데이터 분류로 묶는 작업
- Shuffling
- 네트워크 통신이 가장 많이 일어나는 구간이자 MapReduce의 성능 저하가 가장 많이 발생하는 구간
- 생성된 Intermediate Key-Value 데이터를 Reduce로 전달하는 과정
- Reducing
- Reduce를 수행하여 각 블록에서 특정 단어가 몇 번 나왔는지를 계산
- Reduce: Map에서 출력된 데이터에서 중복 데이터를 제거하고 원하는 데이터를 추출하는 작업
- 결과를 합산하여 HDFS에 결과를 파일로 저장
하둡 에코시스템(Hadoop EcoSystem)
- 하둡 프레임워크를 이루고 있는 다양한 프로젝트의 모임
- 분산 코디네이터(ZooKeeper)
- 분산 동기화를 제공하고 그룹 서비스를 제공하는 중앙 집중식 서비스로 알맞은 분산처리 및 분산 환경을 구성하는 서버 설정을 통합적으로 관리
- 분산 환경에서 서버 간의 상호 조정에 필요한 다양한 서비스를 제공하는 시스템
2. 분산 리소스 관리(YARN)
- 작업 스케줄링 및 클러스터 리소스 관리를 위한 프레임워크로 맵리듀스, 하이브, 임팔라, 스파크 등 다양한 어플리케이션들이 얀에서 작업을 실행
- Resource Management 기능과 Job Scheduling/Monitoring 기능을 분리된 데몬에서 처리하는 것이 핵심 아이디어
Resource Manager
- 자원을 어디에 할당할지 결정하고 Cluster의 활용을 최적화하는 역할을 수행
- 처리요청에 대해 Request의 일부를 Node Manager로 전달
- Scheduler와 Application Manager로 구성
- Scheduler: Capacites, Queue 등의 제한 조건에 따라 실행중인 다양한 어플리케이션에 자원을 할당 하는 역할
- Application Manager : Job의 제출을 수락하고 Application Master를 실행하기 위한 첫번째 컨테이너 설정을 수행 실패 시, Application Master의 컨테이너를 재시작하게 해줌
- Container : 한 노드의 자원(RAM, CPU, Network 등)을 모아둔 패키지
Application Master
- 클러스터에서 애플리케이션의 실행을 조정하고 오류를 관리하는 역할을 수행
- Application : 하나의 작업 또는 작업의 DAG(Directed Acyclic Graph, 비순환 방향 그래프)
- Node Manager와 함께 작동하여 Task를 실행시키고 모니터링하며 Resource Manager를 통해 자원을 협상하고 할당 받음
- 할당 받은 자원의 상태를 추적하고 모니터링하는 역할 포함
Node Manager
- 개별 노드를 관리하고 주어진 노드에서 사용자의 작업 및 Work Flow를 관리하는 역할을 수행
- Resource Manager에 등록 되어 노드의 상태를 판별하기 위한 HeartBeats를 전송
- Resource Manager를 통해 할당 받은 컨테이너의 자원 사용량을 감시하는 등의 관리 역할도 함께 수행
YARN의 동작 방식
- Client가 Resource Manager에게 Application에 대한 요청을 전송
- Resource Manager는 Application Master와 함께 Application을 등록 (이때, Application ID가 생성되며, 후에 Client에게 반환됨)
- Resource Manager는 각각의 분리된 Container에서 Application Master를 구동, 만약 컨테이너가 구동 불가능한 상태라면 적합한 컨테이너를 찾을 때까지 기다림
- Application Master가 Node Manager에게 컨테이너의 실행 명령을 전달
- Application 코드가 컨테이너에서 실행
- Client는 Application의 상태를 모니터링하기 위해 Resource Manager/Application Manager와 통신
- Application Master가 Resource Manager에서 등록 해지됨
🖥️MapReduce 실습
1. git clone 하기
git clone https://github.com/big-data-europe/docker-hadoop-spark-workbench.git
2. namenode & datanode 컨테이너 띄우기
docker-compose -f docker-compose-hive.yml up -d namenode
docker-compose -f docker-compose-hive.yml up -d datanode
3. volumes 연결 및 텍스트 파일 준비하기
docker-compose.yml 파일에서 data node의 volumes 경로를 확인한다
* :를 기준으로 [내 컴퓨터 경로] : [컨테이너 안 경로] 가 docker-compose 파일에 작성됨!
컴퓨터 지정 경로로 이동해서 텍스트 파일을 생성해준다
4. wordCount.java 작성
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
//입력 데이터를 단어 단위로 분할, 각 단어 빈도를 1로 설정
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{ //키 유형, 입력값 유형, 단어, 단어 빈도 를 상속 받음
private final static IntWritable one = new IntWritable(1); #각 단어의 출현 빈도를 나타내는 변수 선언
private Text word = new Text(); //Mapper가 출력할 단어 저장
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString()); //입력된 텍스트 값을 StringTokenizer 객체로 변환
while (itr.hasMoreTokens()) { //토큰 단어 있는 동안 반복
word.set(itr.nextToken());//itr.nextToken()이 다음 단어를 가져오고, 이를 word 객체에 설정
context.write(word, one); //각 단어를 출력
}
}
}
//동일한 단어를 그룹화하고 각 그룹의 단어 빈도를 합산
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> { //입력 키 유형, 단어 빈도 유형 상속
private IntWritable result = new IntWritable();
//현재 처리 중인 키, 동일 키에 대해 mapper가 출력한 값 리스트를 매개변수로 사용
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) { //values 리스트에 포함된 InWritable 값 순회 -> value는 특정 단어에 대해 Mapper가 출력한 모든 1들의 리스트
sum += val.get(); // 각 값을 sum 변수에 저장
}
result.set(sum); //최종 합산 결과 sum을 result 변수에 저장
context.write(key, result); //키와 최종 빈도를 context에 기록
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class); //작업 실행 시 사용할 클래스가 포함된 JAR 파일 설정
job.setMapperClass(TokenizerMapper.class); //사용할 Mapper 클래스 설정
job.setCombinerClass(IntSumReducer.class); //combiner 클래스 설정, combiner = reducer의 작업을 mapper와 reducer 사이에서 미리 수행, 성능 향상
job.setReducerClass(IntSumReducer.class);//사용할 Reducer 클래스 설정
job.setOutputKeyClass(Text.class); //Mapper와 Reducer의 출력 키 타입 설정
job.setOutputValueClass(IntWritable.class); //Mapper와 Reducer의 출력 값 타입 설정
FileInputFormat.addInputPath(job, new Path(args[0])); //입력 파일 경로를 설정
FileOutputFormat.setOutputPath(job, new Path(args[1])); //출력 파일 셩로 설정
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
5. 컴파일
docker exec -it {컨테이너 ID} /bin/bash
cd /hadoop/dfs/data
ls
ls로 파일 목록을 확인하면 방금 생성한 word.txt와 wordCount.java 파일이 있는 것을 확인할 수 있다
환경변수 $JAVA_PATH, $PATH, $HADOOP_CLASSPATH 를 설정해준다
echo $JAVA_HOME #JAVA_HOME은 확인만!
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
두 개 명령어를 실행한다
#자바 파일 컴파일
hadoop com.sun.tools.javac.Main WordCount.java
#컴파일된 class 파일을 jar 파일로 패키징
jar cf wc.jar WordCount*.class
6. 실행
HDFS로 파일을 복사해서 올린다
hadoop fs -put ./word.txt /
잘 올라간 것을 확인할 수 있다
java 코드에서 0번째 파라미터로 input 파일의 hdfs 경로를 주고, 1번째 파라미터로 output이 저장될 경로를 명시했으므로 이에 맞춰 생성한 jar 파일을 hadoop jar 명령으로 실행한다
hadoop jar wc.jar WordCount /words.txt /output
로그가 찍히는 것을확인할 수 있다!
hadoop fs -ls /output
hadoop fs -cat /output/part-r-00000
지정한 output 경로로 가서 생성된 파일 내용을 확인해보면
잘 카운트 된 것을 확인할 수 있었다
출처:
https://so-easy-coding.tistory.com/15
https://ryufree.tistory.com/205
https://mangkyu.tistory.com/127
https://velog.io/@ha0kim/2021-03-02
https://velog.io/@kimdukbae/HDFS-Hadoop-Distributed-File-System
'DataEngineering' 카테고리의 다른 글
[Kafka & Spark] 실시간으로 사람 detect하고 집계 테이블 만들기(2) (0) | 2024.08.25 |
---|---|
[Apache Kafka] 실시간으로 사람 detect하고 집계 테이블 만들기 (2) | 2024.08.18 |
[Apache Spark] 스트림 데이터 처리 (2) | 2024.08.04 |
[Apache Airflow] Apache Airflow 기반의 데이터 파이프라인 CH2 정리 (0) | 2024.07.25 |
[Apache Airflow] 도커로 환경 구축하기 (0) | 2024.07.24 |