spark rdd 예제

향상된 파이썬 인터프리터인 IPython에서 PySpark 쉘을 시작할 수도 있습니다. PySpark는 IPython 1.0.0 이상과 함께 작동합니다. IPython을 사용하려면 BIN/pyspark를 실행할 때 PYSPARK_DRIVER_PYTHON 변수를 ipython으로 설정합니다: 태그: 기존 아파치 스파크외부 데이터 집합쇼로 스파크 rddparallelized 컬렉션을 만들 수 있는 스파크 Resilient 분산에서 RDD를 만들 수 있는 방법 시퀀스 파일에 대한 스파크 RDD를 만드는 데이터 세트Tranformation 및 작업 웨이, K와 V가 파일의 키와 값의 유형인 스파크컨텍스트의 시퀀스File[K, V] 방법을 사용합니다. 이러한 클래스는 IntWritable 및 텍스트와 같은 Hadoop의 쓰기 가능한 인터페이스의 하위 클래스여야 합니다. 또한 Spark를 사용하면 몇 가지 일반적인 Writables에 대한 네이티브 형식을 지정할 수 있습니다. 예를 들어 sequenceFile[Int, 문자열]은 자동으로 IntWritables 및 텍스트를 읽습니다. 이 예제에서는 데이터베이스에 저장된 테이블을 읽고 모든 연령대의 인원수를 계산합니다. 마지막으로 계산된 결과를 JSON 형식으로 S3에 저장합니다. 간단한 MySQL 테이블 « 사람 »이 예제에서 사용되며 이 테이블에는 « 이름 » 및 « 나이 »라는 두 개의 열이 있습니다. 이 메서드는 파일에 대 한 URI (컴퓨터의 로컬 경로 또는 hdfs://, s3n://, etc URI)를 사용 하 고 줄의 컬렉션으로 읽습니다.

다음은 파일 test_file을 가져와 SparkContext의 textFile 메서드를 사용하여 RDD를 만든 예제 호출입니다. 여기에서는 HDFS에서 새 데이터 집합/텍스트 파일을 로드하여 새 RDD를 만듭니다. 같은 경우 아래 스크린 샷을 참조하십시오. 생성된 분산 데이터 집합(distData)은 병렬로 작동할 수 있습니다. 예를 들어 distData.reduce((a, b) =b)를 호출하여 배열의 요소를 추가할 수 있습니다. 나중에 분산 데이터 집합에 대한 작업을 설명합니다. 예를 들어 counts.sortByKey()를 사용하여 쌍을 사전순으로 정렬하고 마지막으로 counts.collect()를 사용하여 개체의 배열로 드라이버 프로그램으로 가져올 수도 있습니다. RDD는 기존 데이터 집합에서 새 데이터 집합을 만드는 변환과 데이터 집합에서 계산을 실행한 후 드라이버 프로그램에 값을 반환하는 작업의 두 가지 유형의 작업을 지원합니다. 예를 들어 맵은 함수를 통해 각 데이터 집합 요소를 전달하고 결과를 나타내는 새 RDD를 반환하는 변환입니다. 반면에 reduce는 일부 함수를 사용하여 RDD의 모든 요소를 집계하고 최종 결과를 드라이버 프로그램에 반환하는 작업입니다(분산 데이터 집합을 반환하는 병렬 reduceByKey도 있음). 참고 – 이 예제에서는 .rdd 메서드가 사용되는 것을 보았습니다.