NIRVANA

[Apache Hadoop] 하둡(Hadoop)이란? 본문

DataEngineering

[Apache Hadoop] 하둡(Hadoop)이란?

녜잉 2024. 8. 11. 21:42

하둡(Hadoop)이란?

  • High-Availability Distributed Object-Oriented Platform
  • 빅데이터를 저장, 처리, 분석할 수 있는 자바 소프트웨어 프레임워크
    • 대용량의 데이터를 클러스터에서 병렬로 동시에 처리하여 처리 속도를 높이는 것을 목적으로 하는 분산처리를 위한 오픈 소스 프레임워크

 

하둡 개발 배경

  • 야후(Yahoo!)의 더그 커팅이 검색 엔진을 개발하는 과정에서 대용량의 비정형 데이터를 기존의 RDB 기술로 처리하는 데에 한계를 발견
  • 구글에서 발표한 GFS와 MapReduce 관련 논문을 참고하여 개발
  • 이후 Apache 재단으로 프로젝트가 넘어가 오픈 소스로 공개됨

 

💡구글 GFS와 MapReduce

GFS(Google File System, 구글 파일 시스템)

  • 구글 파일 시스템은 급속히 늘어나는 구글의 데이터 처리를 위해 설계된 대용량 분산 파일 시스템

구글 파일 시스템 구성

  • 마스터(Master) - GFS 전체의 상태를 관리하고 통제하는 중앙 서버
  • 청크 서버(Chunk Server) - 물리적인 하드 디스크에 실제 입출력 처리
  • 클라이언트(Client) - 파일을 읽고 쓰는 동작을 요청하는 어플리케이션

 

구글 파일 시스템 동작 과정

 

  • 클라이언트가 마스터에게 파일의 읽기/쓰기를 요청
  • 요청을 받은 마스터가 클라이언트와 가장 가까운 청크 서버의 정보를 클라이언트에게 전달
  • 클라이언트는 전달 받은 정보를 바탕으로 청크 서버와 직접 통신하며 파일의 읽기/쓰기를 실행

 

MapReduce

  • 여러 노드에 태스크를 분배하는 방법
  • 하나의 큰 작업을 작은 작업으로 분할(Map)하고 작업 결과를 합치는(Reduce) 방법

 

하둡 구성 요소

하둡 분산형 파일 시스템(Hadoop Distributed File System, HDFS)

  • 하둡 네트워크에 연결된 기기의 데이터를 저장하는 분산형 파일 시스템
  • 여러 서버에 대용량 파일을 나누어서 저장
    • 서버에 데이터를 중복 저장함으로써 데이터 안전성을 얻음

특징

  1. HDFS는 데이터를 저장하면 다수의 노드에 복제 데이터도 함께 저장하여 데이터 유실을 방지
  2. HDFS에 파일을 저장하거나 저장된 파일을 조회하려면 스트리밍 방식으로 데이터에 접근해야 함
  3. 한번 저장한 데이터는 수정할 수 없고, 읽기만 가능
  4. 데이터 수정은 불가능하지만 파일 이동, 삭제, 복사할 수 있는 인터페이스 제공

HDFS 구성

  • HDFS는 마스터/슬레이브 구조를 가짐
  • HDFS에서 마스터 역할 → NameNode 서버 1대 HDFS에서 슬레이브 역할 → DataNode 서버 여러 대로 구성
  • Block 구조의 파일 시스템으로 저장하는 파일은 특정 사이즈의 Block으로 나누어져 분산된 서버에 저장
    • 하나의 Block은 3개로 복제되고, 각각 다른 HDFS의 노드에 분산 저장됨
  • 하둡 어플리케이션은 HDFS에 파일을 저장하거나, 저장된 파일을 읽기 위해 HDFS 클라이어트를 사용하며 클라이언트는 API 형태로 사용자에게 제공
  • 클라이언트는 NameNode에 접속해서 원하는 파일이 저장된 Block의 위치를 확인하고, 해당 블록이 저장된 DataNode에서 직접 데이터를 조회

HDFS Cluster

  1. 하나의 NameNode & FileSystem을 관리
  2. 클라이언트의 접근을 통제하는 Master Server로 구성
  3. 클러스터의 각 노드들에는 DataNode가 하나씩 존재하고, DataNode는 실행될 때 마다 Node에 추가되는 스토리지를 관리
  4. HDFS는 네임스페이스를 공개, 유저 데이터가 파일에 저장되는 것을 허용

NameNode

  1. HDFS의 모든 메타 데이터 관리
    • 메타 데이터
      • 파일명
      • 디렉터리
      • 데이터 블록 크기
      • 소유자: 소속 그룹
      • 파일 속성
      • DataNode와 블록 대응 정보
        • 블록 ID와 해당 블록을 보유한 DataNode 정보
  2. 파일과 디렉토리의 읽기(open), 닫기(close), 이름 바꾸기(rename) 등 파일 시스템의 네임 스페이스의 여러 기능을 수행
  3. DataNode와 Block들의 매핑 결정

DataNode

  1. 주기적으로 NameNode에서 블록 리포트(노드에 저장되어 있는 블록의 정보)를 전송
    → 이를 통해 NameNode는 DataNode가 정상 동작하는지 확인
  2. 파일 시스템의 클라이언트가 요구하는 읽기, 쓰기 기능을 수행
    → DataNode는 NameNode에서의 생성, 삭제, 복제 등과 같은 기능도 수행

 

맵리듀스(MapReduce)

  • 대용량의 데이터 처리를 위한 분산 프로그래밍 모델이자 소프트웨어 프레임워크
  • 맵 리듀스 프레임워크를 통해 대규모 분산 컴퓨팅 환경에서 대량의 데이터를 병렬로 분석 가능
  • 맵(Map)과 리듀스(Reduce) 두 개의 메서드로 구성
    • 흩어져 있는 데이터를 수직화하여 데이터를 각각의 종류별로 모으고(Map) 필터링과 sorting을 거쳐 데이터를 뽑아내는(Reduce) 분산처리 기술과 관련 프레임워크를 의미

 

MapReduce Workflow

  1. Input 데이터 입력
    • 단어의 개수를 세기 위한 텍스트 파일들을 HDFS에 업로드, 각각의 파일은 블록 단위로 나누어 저장
  2. Splitting 입력된 데이터를 Input Split 단위로 분할, 이때 Input Split의 크기는 일반적으로 HDFS의 블록 크기와 동일
    • 큰 데이터가 들어왔을 때, 64MB(하둡 2.0부터는 128MB) 단위 블록으로 분할
    • 분할된 모든 블록은 같은 Map 작업을 수행하고 이후 Reduce 과정을 통해 합쳐짐
  3. MappingInput Split의 데이터를 레코드 단위로 한 줄씩 읽어서 사용자가 정의한 Map Function을 적용
    • Map 연산을 통해 (단어, 1)의 리스트를 반환
    • Map : 데이터를 (key, Value) 쌍의 형태로 연관성 있는 데이터 분류로 묶는 작업
  4. Shuffling
    • 네트워크 통신이 가장 많이 일어나는 구간이자 MapReduce의 성능 저하가 가장 많이 발생하는 구간
    • 생성된 Intermediate Key-Value 데이터를 Reduce로 전달하는 과정
  5. Reducing
    • Reduce를 수행하여 각 블록에서 특정 단어가 몇 번 나왔는지를 계산
    • Reduce: Map에서 출력된 데이터에서 중복 데이터를 제거하고 원하는 데이터를 추출하는 작업
  6. 결과를 합산하여 HDFS에 결과를 파일로 저장

 

하둡 에코시스템(Hadoop EcoSystem)

  • 하둡 프레임워크를 이루고 있는 다양한 프로젝트의 모임

 

  1. 분산 코디네이터(ZooKeeper)
  • 분산 동기화를 제공하고 그룹 서비스를 제공하는 중앙 집중식 서비스로 알맞은 분산처리 및 분산 환경을 구성하는 서버 설정을 통합적으로 관리
  • 분산 환경에서 서버 간의 상호 조정에 필요한 다양한 서비스를 제공하는 시스템

 

2. 분산 리소스 관리(YARN)

  • 작업 스케줄링 및 클러스터 리소스 관리를 위한 프레임워크로 맵리듀스, 하이브, 임팔라, 스파크 등 다양한 어플리케이션들이 얀에서 작업을 실행
  • Resource Management 기능과 Job Scheduling/Monitoring 기능을 분리된 데몬에서 처리하는 것이 핵심 아이디어

 

Resource Manager

  • 자원을 어디에 할당할지 결정하고 Cluster의 활용을 최적화하는 역할을 수행
  • 처리요청에 대해 Request의 일부를 Node Manager로 전달
  • Scheduler와 Application Manager로 구성
    • Scheduler: Capacites, Queue 등의 제한 조건에 따라 실행중인 다양한 어플리케이션에 자원을 할당 하는 역할
    • Application Manager : Job의 제출을 수락하고 Application Master를 실행하기 위한 첫번째 컨테이너 설정을 수행 실패 시, Application Master의 컨테이너를 재시작하게 해줌
      • Container : 한 노드의 자원(RAM, CPU, Network 등)을 모아둔 패키지

 

Application Master

  • 클러스터에서 애플리케이션의 실행을 조정하고 오류를 관리하는 역할을 수행
    • Application : 하나의 작업 또는 작업의 DAG(Directed Acyclic Graph, 비순환 방향 그래프)
  • Node Manager와 함께 작동하여 Task를 실행시키고 모니터링하며 Resource Manager를 통해 자원을 협상하고 할당 받음
    • 할당 받은 자원의 상태를 추적하고 모니터링하는 역할 포함

 

Node Manager

  • 개별 노드를 관리하고 주어진 노드에서 사용자의 작업 및 Work Flow를 관리하는 역할을 수행
  • Resource Manager에 등록 되어 노드의 상태를 판별하기 위한 HeartBeats를 전송
  • Resource Manager를 통해 할당 받은 컨테이너의 자원 사용량을 감시하는 등의 관리 역할도 함께 수행

 

YARN의 동작 방식

  1. Client가 Resource Manager에게 Application에 대한 요청을 전송
  2. Resource Manager는 Application Master와 함께 Application을 등록 (이때, Application ID가 생성되며, 후에 Client에게 반환됨)
  3. Resource Manager는 각각의 분리된 Container에서 Application Master를 구동, 만약 컨테이너가 구동 불가능한 상태라면 적합한 컨테이너를 찾을 때까지 기다림
  4. Application Master가 Node Manager에게 컨테이너의 실행 명령을 전달
  5. Application 코드가 컨테이너에서 실행
  6. Client는 Application의 상태를 모니터링하기 위해 Resource Manager/Application Manager와 통신
  7. Application Master가 Resource Manager에서 등록 해지됨

 


 

🖥️MapReduce 실습

 

1. git clone 하기

git clone https://github.com/big-data-europe/docker-hadoop-spark-workbench.git

 

 

2. namenode & datanode 컨테이너 띄우기 

docker-compose -f docker-compose-hive.yml up -d namenode
docker-compose -f docker-compose-hive.yml up -d datanode

 

3. volumes 연결 및 텍스트 파일 준비하기

docker-compose.yml 파일에서 data node의 volumes 경로를 확인한다 

* :를 기준으로 [내 컴퓨터 경로] : [컨테이너 안 경로] 가 docker-compose 파일에 작성됨! 

 

컴퓨터 지정 경로로 이동해서 텍스트 파일을 생성해준다 

 

 

4. wordCount.java 작성

 

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

//입력 데이터를 단어 단위로 분할, 각 단어 빈도를 1로 설정
  public static class TokenizerMapper  
       extends Mapper<Object, Text, Text, IntWritable>{ //키 유형, 입력값 유형, 단어, 단어 빈도 를 상속 받음 

    private final static IntWritable one = new IntWritable(1); #각 단어의 출현 빈도를 나타내는 변수 선언 
    private Text word = new Text(); //Mapper가 출력할 단어 저장 

    public void map(Object key, Text value, Context context 
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString()); //입력된 텍스트 값을 StringTokenizer 객체로 변환 
      while (itr.hasMoreTokens()) { //토큰 단어 있는 동안 반복 
        word.set(itr.nextToken());//itr.nextToken()이 다음 단어를 가져오고, 이를 word 객체에 설정 
        context.write(word, one); //각 단어를 출력 
      }
    }
  }

//동일한 단어를 그룹화하고 각 그룹의 단어 빈도를 합산 
  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> { //입력 키 유형, 단어 빈도 유형 상속 
    private IntWritable result = new IntWritable();
//현재 처리 중인 키, 동일 키에 대해 mapper가 출력한 값 리스트를 매개변수로 사용 
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) { //values 리스트에 포함된 InWritable 값 순회  -> value는 특정 단어에 대해 Mapper가 출력한 모든 1들의 리스트 
        sum += val.get(); // 각 값을 sum 변수에 저장 
      }
      result.set(sum); //최종 합산 결과 sum을 result 변수에 저장 
      context.write(key, result); //키와 최종 빈도를 context에 기록 
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class); //작업 실행 시 사용할 클래스가 포함된 JAR 파일 설정 
    job.setMapperClass(TokenizerMapper.class); //사용할 Mapper 클래스 설정 
    job.setCombinerClass(IntSumReducer.class); //combiner 클래스 설정, combiner = reducer의 작업을 mapper와 reducer 사이에서 미리 수행, 성능 향상 
    job.setReducerClass(IntSumReducer.class);//사용할 Reducer 클래스 설정 
    job.setOutputKeyClass(Text.class); //Mapper와 Reducer의 출력 키 타입 설정 
    job.setOutputValueClass(IntWritable.class); //Mapper와 Reducer의 출력 값 타입 설정 
    
    FileInputFormat.addInputPath(job, new Path(args[0])); //입력 파일 경로를 설정 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); //출력 파일 셩로 설정 
    
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

 

 

5. 컴파일

docker exec -it {컨테이너 ID} /bin/bash
cd /hadoop/dfs/data
ls

 

ls로 파일 목록을 확인하면 방금 생성한 word.txt와 wordCount.java 파일이 있는 것을 확인할 수 있다

 

환경변수 $JAVA_PATH,  $PATH, $HADOOP_CLASSPATH 를 설정해준다 

echo $JAVA_HOME #JAVA_HOME은 확인만!
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

 

두 개 명령어를 실행한다 

 

#자바 파일 컴파일
hadoop com.sun.tools.javac.Main WordCount.java 

#컴파일된 class 파일을 jar 파일로 패키징 
jar cf wc.jar WordCount*.class

 

 

6. 실행

 

HDFS로 파일을 복사해서 올린다

hadoop fs -put ./word.txt /

 

잘 올라간 것을 확인할 수 있다 

 

java 코드에서 0번째 파라미터로 input 파일의 hdfs 경로를 주고, 1번째 파라미터로 output이 저장될 경로를 명시했으므로 이에 맞춰 생성한 jar 파일을 hadoop jar 명령으로 실행한다 

hadoop jar wc.jar WordCount /words.txt /output

로그가 찍히는 것을확인할 수 있다!

 

hadoop fs -ls /output
hadoop fs -cat /output/part-r-00000

지정한 output 경로로 가서 생성된 파일 내용을 확인해보면

 

 

잘 카운트 된 것을 확인할 수 있었다 

 

 

 

출처:

https://so-easy-coding.tistory.com/15

https://ryufree.tistory.com/205

https://mangkyu.tistory.com/127

https://velog.io/@ha0kim/2021-03-02

https://velog.io/@kimdukbae/HDFS-Hadoop-Distributed-File-System

https://velog.io/@spdlqjfire/하둡-프로그래밍-MapReduce