블로그 이미지
평범하게 살고 싶은 월급쟁이 기술적인 토론 환영합니다.같이 이야기 하고 싶으시면 부담 말고 연락주세요:이메일-bwcho75골뱅이지메일 닷컴. 조대협


Archive»


 
 

Spark Key/Value Pairs

조대협 

http://bcho.tistory.com


RDD에는 어떤 데이타 형식이라던지 저장이 가능한데, 그중에서 Pair RDD라는 RDD가 있다. 이  RDD는 Key-Value  형태로 데이타를 저장하기 때문에, 병렬 데이타 처리 부분에서 그룹핑과 같은 추가적인 기능을 사용할 수 있다.

예를 들어 reduceByKey 와 같이 특정 키를 중심으로 데이타 연산 (각 키 값 기반으로 합이나 평균을 구한다던가) key 기반으로 join 을 한다던가와 같은 그룹핑 연산에 유용하게 사용할 수 있다.

Pair RDD를 생성하는 방법은 다음과 같다.

Java
mapToPair나 flatMapToPair 라는 메서드를 사용하면 된다.

mapToPair등의 함수를 이용할때, 아래와 같이 람다 표현식을 사용하는 방식이 있고

RDD.mapToParis( d-> new Tuple2(key,value))
(d는 RDD에서 읽어오는 값. Key는 새로운  RDD를 생성할때, 해당 Tuple의 키값,  value는 해당 Tuple의 Value값)

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(-> new Tuple2(s, 1));

또는 아래와 같이, Pair함수를 정의한 후에, Pair 함수를 mapToPair함수등에 넘기는 방법이 있다.

아래는 텍스트 라인에서 첫번째 단어를 Key로하고, 해당 라인을 Value로 하는 코드 예제이다.

PairFunction<String, String, String> keyData =
  new PairFunction<String, String, String>() {
  public Tuple2<String, String> call(String x) {
    return new Tuple2(x.split(" ")[0], x);
  }
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);
* Learning spark 코드 참조

Python

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)



Apache Spark(스파크) - RDD Persistence (스토리지 옵션에 대해서)


조대협 (http://bcho.tistory.com)


Spark Persistence에 대해서


앞에 글에서 Spark RDD가 메모리에 상주 되는 방법에 대해서 간략하게 언급했는데, 다시 되 짚어 보면 Spark의 RDD는 filter() 등. 여러  Transformation Operation을 실행하더라도  Transformation  단계가 아니라 Action이 수행되는 단계에 로드된다고 설명하였다.


그리고, 매번 해당 RDD가 Action으로 수행될 때마다 다시금 소스에서 부터 다시 로드되서 수행된다고 했는데, 그렇다면 매번 로드 해서 계산하여 사용하는 것이 아니라, 저장해놓고 사용 하는 방법이 무엇이 있을까?


스파크에서는 RDD 를 저장해놓고 사용하는 기능으로 persist()와 cache() 라는 두 가지 오퍼레이션을 지원한다.

스파크는 RDD를 저장함에 있어서,  메모리와 디스크 두 가지 영역을 사용하며, 옵션에 따라서 RDD의 저장 영역을 지정할 수 있다.


기본 디폴트는 메모리에 저장하고, 옵션으로 디스크를 지정할 수 있다. 디스크를 지정하면, 메모리에서 모지란 부분을 마치 swapping 하듯이 디스크에 데이타를 저장한다.


아래 옵션 참고 




출처 : https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence


여기에 특이한 점은, 메모리나 디스크에 저장할때, RDD를 RAW (원본 형식)으로 저장할 것인지 자바의 Serialized 된 형태로 저장할 지를 선택할 수 있다. ( Serealized 된 형태로 저장하기 MEMORY_ONLY_SER, MEMORY_AND_DISK_SER) 이렇게 저장하면, 메모리 사용량은 더 줄일 수 있지만, 반대로 저장시 Serizalied하는 오버로드와, 읽을때 De-Seriazlie 하는 오버로드가 더 붙어서 CPU 사용량은 오히려 증가하게 된다.


아래 데이타는 http://sujee.net/2015/01/22/understanding-spark-caching/#.VWcOh1ntlBc 의 데이타 긴데,

Serialized 로 저장하는 경우, 최대 약 4배 정도의 메모리 용량을 절약할 수 있으나, 반대로, 처리 시간은 400배 이상이 더 들어간다.

 









 


다음으로, 특이한 옵션중에 하나가 OFF_HEAP 이라는 옵션인데, 스파크는 JVM 상에서 동작하기 때문에, 스파크가 저장하는 메모리란 JVM 상의 메모리를 뜻한다. JVM  특성상 Garbage collection 에 의한 성능 제약을 받을 수 있으며 또한 별도로 서로 복제가 되지 않기 때문에, (기본 옵션의 경우에만), 안정적인 서비스를 원할 경우에는 별도의 복제 옵션을 선택해야 한다.

이런 문제를 해결하기 위한 다른 옵션으로는 JVM 내에 데이타를 저장하는 것이 아니라, 별도의 JVM 외의 메모리 공간에 데이타를 저장하는 방식이 OFF_HEAP 이라는 옵션이다. 아직 안정화 되지는 않았지만, http://tachyon-project.org/ 이라는 메모리 클러스터를 이용하여, 서로 복제가 가능한 외부 메모리 클러스터에 저장하는 방식으로, JVM  상 메모리 보다는 성능이 약간 떨어지지만, 디스크보다는 빠르며, 큰 메모리 공간을 장애 대응에 대한 상관 없이 (자체 적으로 HA  기능을 제공함) 사용이 가능하다. 

 cf. Redis나  Infinispan등과 같은 메모리 기반의 데이타 그리드 솔루션의 하나인 Hazelcast 역시 JVM 밖의 네이티브 메모리 공간에 데이타를 저장하는 유사한 방식을 사용한다.


Persist vs Cache


그렇다면, persist()와 cache()의 차이점은 무엇인가? cache()는  persist() 에서 저장 옵션을 MEMORY_ONLY로 한 옵션과 동일하다.


저장된 RDD는 메모리나 디스크에서 언제 삭제 되는가?

RDD가 메모리나 디스크에 로드되었다고 항상 로드된 상태로 있는 것이 아니다. 기본적으로 LRU (Least Recently Used)  알고리즘 (가장 근래에 사용되지 않은 데이타가 삭제되는 방식)에 의해서 삭제가 되가나, 또는 RDD.unpersiste() 함수를 호출하면 명시적으로 메모리나 디스크에서 삭제할 수 있다.


언제 어떤 타입의 Peristence옵션을 사용해야 하는가?


가장 좋은 옵션은 디폴트 옵션인  MEMORY_ONLY  옵션이다. 가장 빠르다.

다음으로 메모리가 모자를 경우에는  MEMORY_ONLY_SER 옵션을 이용하면, Seriazlied 된 형태로 저장하기 때문에 메모리 공간은 줄일 수 있으나, 대신 CPU 사용률이 올라간다. 그래도 여전히 빠른 방식이다.

데이타 양이 많을 경우에는 DISK에 저장하는 옵션보다는 차라리 Persist 를 하지 않고, 필요할때 마다 재계산 하는 것이 더 빠를 수 있다.

빠른 응답이 필요한 경우에 Persist 된 데이타에 대한 유실을 방지하려면, replicated storage 옵션을 사용하는 것이 좋다. (MEMORY_ONLY2 등).  다른 스토리지 타입 역시, 장애로 인해서 데이타가 유실되더라도 재계산을 통하여 복구가 가능하지만, 재계산 하는 것 보다는 RDD 의 복제본을 저장해 놓고, 장애시 페일오버 하는 것이 빠르기 때문에, 빠른 응답시간을 요구로 하는 웹 애플리케이션의 경우 이 스토리지 타입이 유리하다. (단, 메모리 사용량은 복제본을 저장하는데도 사용되기 때문에 상대적으로 일반 스토리지 옵션에 비해서 메모리 여유가 적다.)



참고 

Learning Spark

Spark document - https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence


참고 - 이 글은 제가 스파크를 혼자 공부하면서 문서만을 통해서 정리한 글이기 때문에, 실무적인 경험이 많이 녹아 들어 있지 않습니다. 

Spark RDD  이해하기 #1

조대협(http://bcho.tistory.com)


기본 개념 잡기

RDD 는 여러 분산 노드에 걸쳐서 저장되는 변경이 불가능한 데이타(객체)의 집합으로 각각의 RDD는 여러개의 파티션으로 분리가 된다. (서로 다른 노드에서 분리되서 실행되는). 

쉽게 말해서 스파크 내에 저장된 데이타를 RDD라고 하고, 변경이 불가능하다. 변경을 하려면 새로운 데이타 셋을 생성해야 한다.

RDD의 생성은 외부로 부터 데이타를 로딩하거나 또는 코드에서 생성된 데이타를 저장함으로써 생성할 수 있다.

RDD에서는 딱 두 가지 오퍼레이션만 지원한다.
  • Transformation : 기존의 RDD 데이타를 변경하여 새로운 RDD 데이타를 생성해내는 것. 흔한 케이스는 filter와 같이 특정 데이타만 뽑아 내거나 map 함수 처럼, 데이타를 분산 배치 하는 것 등을 들 수 있다.
  • Action : RDD 값을 기반으로 무엇인가를 계산해서(computation) 결과를 (셋이 아닌) 생성해 내는것으로 가장 쉬운 예로는 count()와 같은 operation들을 들 수 있다.

RDD의 데이타 로딩 방식은 Lazy 로딩 컨셉을 사용하는데, 예를 들어 sc.textFile(“파일”)로 파일을 로딩하더라도 실제로 로딩이 되지 않는다. 파일이 로딩되서 메모리에 올라가는 시점은 action을 이용해서 개선할 당시만 올라간다.
아래 코드를 보자 아래 코드는 “README.md” 파일을 RDD로 로딩 한후에

  1. pythonLines에 “Python”이라는 단어를 가지고 있는 라인만 추려서 새로운 RDD를 만들고,
  2. 그 다음 count() action을 이용하여, 그 줄 수 를 카운트 하는 예제이다.




그렇다면, 언제 실제 README.md 파일이 읽혀질까? 실제로 읽혀지는 시기는 README.md 파일을 sc.textFile로 오픈할 때가 아니라 .count() 라는 액션이 수행될 때 이다.
이유는 파일을 오픈할때 부터 RDD를 메모리에 올려놓게 되면 데이타가 클 경우, 전체가 메모리에 올라가야 하는데, 일반적으로 filter 등을 이용해서 데이타를 정재한 후에,  action을 수행하기 때문에, action을 수행할때, action수행시 필요한 부분만 (filter가 적용된 부분만) 메모리에 올리면 훨씬 작은 부분을 올릴 수 있기 때문에 수행시에 데이타를 로딩하게 된다. 그렇다면 로딩된 데이타는 언제 지워질까?
action을 수행한다음 바로 지워진다.

위에서 보면 lines.count()를 두번 수행하였는데, 이 실행시 마다 README.md 파일을 다시 읽어드린다. 만약에, 한번 읽어드린 RDD를 메모리에 상주하고 계속해서 재 사용하고 싶다면 RDD.persist()라는 메서드를 이용하면, RDD를 메모리에 상주 시킬 수 있다.

RDD 생성하기

앞에서도 언급했듯이, RDD를 생성하는 방법은 크게 두가지가 있다. 
  • 외부로 부터 파일을 읽어서 로딩하거나 파일은 일반 파일을 읽거나 S3,HBase,HDFS,Cassandra 등에서 데이타를 읽어올 수 있다. 
    파이썬 예제) lines = sc.textFile(“/path/filename.txt”)
  • 또는 드라이버 프로그램내에서 생성된 collection을 parallelize() 라는 메서드를 이용해서 RDD 화 하는 방법이다. (자바 컬렉션등을 RDD로 생성)
    자바 예제) JavaRDD<String> lines = sc.parallelize(Array.asList(“first”,”second”))

RDD Operations

1) Transformation (변환)

변환은 RDD를 필터링하거나 변환하여 새로운 RDD를 리턴하는 오퍼레이션이다.
다음 코드는 README.md 라는 파일을 읽어서 f 라는 RDD를 생성한후
f라는 RDD 에서 “Apache”라는 문자열을 가진 라인만을 모아서 t라는 RDD를 새롭게 생성한 후 화면으로 출력하는 예제이다.
f와 t는 전혀 다른  RDD로 RDD t는 filter에 의해서 새롭게 생성되었다.

<그림. 파이썬 예제>


변환 함수는 filter 뿐 아니라, map, group등 여러가지 함수들이 있으며, 자세한 함수 리스트는 https://spark.apache.org/docs/latest/programming-guide.html#transformations 를 참고하기 바란다.

2) Action (액션)

액션은 RDD를 가지고 계산을 해서 최종 결과를 리턴하거나 또는 데이타를 외부 저장소(External Storage)등에 쓸 수 있다.
최종 결과를 리턴하는 오퍼레이션으로는 앞의 예제에서도 설명한 count()나, 첫번째 element를 리턴하는 first등이 있으며, RDD를 저장하는 오퍼레이션으로는 saveAsTextFile(path)와 같은 오퍼레이션등이 있다.
 


본 포스팅을 오라일사의 "Learning Spark" 과  Sparing Programming Guide를 참고하여 작성하였습니다. https://spark.apache.org/docs/latest/programming-guide.html