Programming Language/Java

Java의 Stream에서 parallelStream은 stream보다 항상 빠를까?

TwinParadox 2021. 7. 18. 20:13
728x90

결론부터 말하자면, 당연히 아니다.

병렬 처리라는 것이 단순히 작업을 쪼개서 수행하는 것에 그치는 게 아니라, 그것을 취합하는 과정도 있고 그 쪼개면서 발생하는 오버헤드, 컨텍스트 스위칭 등을 고려해야 하기 때문이다.

 

ParallelStream

Java8부터는 parallelStream(), parallel()만을 사용하고도 stream을 병렬 처리할 수 있게 한다. ForkJoinPool 관리 방식을 사용해서 복잡하던 스레드 관리 방식을 Fork와 Join을 통해서 작업들을 분할 정복(Divide and Conquer) 기법으로 처리한다. ParallelStream을 사용할 때 몇 가지 특징에 대해 알고 넘어가는 것이 좋다.

 

  • 병렬 처리이기 때문에, 순서를 보장하지 않는다.
  • 별도의 설정이 없으면 해당 어플리케이션이 구동되는 스펙에 따라 스레드 수가 결정된다.
  • 스레드가 N개 생성되었을 때, 하나는 메인 스레드로 스트림을 처리하는 기본 스레드, 나머지 N-1개의 스레드가 ForkJoinPool 스레드이다.
  • 가장 중요한 부분인데, 여기서 사용하는 ForkJoinPool을 Custom Pool을 별도로 생성해서 활용하지 않으면, JVM 인스턴스에서 공통적으로 사용하는 Pool을 사용한다.

 

고민의 시작

스레드 관리도 알아서 해주고, 병렬 처리로 성능 개선을 얻을 수 있다는 점이 상당히 매력적으로 다가왔다. 대규모 데이터 가공 작업에서 Stream API를 많이 사용했는데, 이 좋은 걸 안 쓸 이유가 없다고 생각하고, 성능을 개선시켜야겠다는 생각 하나만으로, 다음 조건을 만족하는 곳에 모두 parallelStream을 넣었다.

 

  • stream을 사용하고 있다.
  • 순서를 보장하지 않아도 된다.
  • IO를 사용하지 않는 작업이다. (이 부분은 추후 설명)

 

위 규칙대로 적용하고 나니, 당연스럽게도 stream을 사용하고 있는 대부분의 로직이 parallelStream으로 대체되었다. 문득, 병렬 처리 과정에서 발생하는 오버헤드를 생각해보니까 조건이 하나 더 필요할 것 같아서 다음 조건을 하나 더 추가했다.

 

  • 컬렉션 사이즈가 작은 경우(특히, 기존 컬렉션을 쪼갠 경우) parallelStream을 쓰지 않는다.

 

요소의 개수가 적은 컬렉션을 병렬 처리하겠다고 배보다 배꼽이 더 큰 오버헤드를 발생시키는 것이 옳지 않다고 생각해서, 어떤 컬렉션을 가공해서 작은 단위로 나누게 되면 그 컬렉션은 그냥 stream을 사용하기로 했다. 

이 규칙을 적용한 상태에서도 군데군데 parallelStream이 남아 있게 됐는데, "애초에 parallelStream을 쓰지 않아도 되는 게 아닌가"에 대한 의심이 지워지지 않아서 테스트를 진행해보기로 했다.

 

 

테스트

public class ParallelSingle {

	public static final int TEST_SIZE = 100000;

	public static void main(String[] args) {

		List<Person> people = new ArrayList<>();
		Random rand = new Random();
		for (int i = 0; i < TEST_SIZE; i++) {
			people.add(new Person(UUID.randomUUID().toString(), rand.nextInt(100), rand.nextInt(100) + 100));
		}

		// Warming up
		for (int i = 0; i < 10; i++) {
			people.stream().filter(p -> p.getAge() % 2 == 0);
			people.parallelStream().filter(p -> p.getAge() % 2 == 0);
		}

		// Simple Task
		System.out.println("Even Age");
		filterTest(people);

		// Grouping Task
		// Warming up
		for (int i = 0; i < 10; i++) {
			Map<Integer, List<Person>> map1 = people.stream().collect(groupingBy(Person::getAge));
			Map<Integer, List<Person>> map2 = people.parallelStream().collect(groupingBy(Person::getAge));
		}

		System.out.println("groupingBy");
		groupingTest(people);
	}

 

홀짝 분류

	private static void filterTest(List<Person> people) {
		long start = 0, end = 0, runningTime = 0;

		// Single Stream
		start = System.nanoTime();
		people.stream().filter(p -> p.getAge() % 2 == 0);
		end = System.nanoTime();
		runningTime = end - start;
		System.out.println("Single Stream : " + runningTime);

		// Parallel Stream
		start = System.nanoTime();
		people.parallelStream().filter(p -> p.getAge() % 2 == 0);
		end = System.nanoTime();
		runningTime = end - start;
		System.out.println("Parallel Stream : " + runningTime);
	}

1차
Even Age
Single Stream : 315900
Parallel Stream : 388500

2차

Even Age
Single Stream : 238200
Parallel Stream : 248400

3차

Even Age
Single Stream : 252400
Parallel Stream : 354400

 

 

groupingBy

	private static void groupingTest(List<Person> people) {
		long start = 0, end = 0, runningTime = 0;

		// Single Stream
		start = System.nanoTime();
		Map<Integer, List<Person>> map1 = people.stream().collect(groupingBy(Person::getAge));
		end = System.nanoTime();
		runningTime = end - start;
		System.out.println("Single Stream : " + runningTime);

		// Parallel Stream
		start = System.nanoTime();
		Map<Integer, List<Person>> map2 = people.parallelStream().collect(groupingBy(Person::getAge));
		end = System.nanoTime();
		runningTime = end - start;
		System.out.println("Parallel Stream : " + runningTime);
	}
@Getter @Setter
@NoArgsConstructor
@AllArgsConstructor
public class Person {
	public String name;
	public int age;
	public int height;
}

1차

groupingBy
Single Stream : 8353900
Parallel Stream : 7380200

2차

groupingBy
Single Stream : 11098300
Parallel Stream : 4982100

3차

groupingBy
Single Stream : 8352600
Parallel Stream : 5143900

 

테스트 결과에 대한 정리

데이터 처리 로직 대부분이 이런 식의 가공 로직임을 감안하면, 작업의 무겁지 않으면 parallelStream을 쓰는 것이 오히려 독이 될 수 있음을 시사하고 있다. 처리 요소의 수와 그 요소에 대한 작업의 비용을 감안해서, 작업이 무거운 작업일 때야 비로소 병렬 처리의 이점을 잘 보여줄 것이라는 생각이 들었다.

groupingBy에 대해 parallelStream이 더 나은 결과를 보여줬던 것처럼 병렬 처리 적용이 성능 개선을 보이는 경우도 있었지만, 시스템 전체 성능에는 큰 영향을 주지 않는 것으로 보여 모두 stream으로 되돌렸다. 추후 성능 이슈가 발생해서 개선 사항에 등록된다면, 먼저 테스트를 해보고 장애 발생 여부를 확인하고 정말로 성능이 개선되는지 확인해보고 적용하는 것이 좋을 것 같다.

 

 

다른 곳에서는 어떤 걸 권장하는가?

Effective Java나 이런 곳들의 표현을 빌려와 보자면...

 

스트림 파이프라인을 아무 생각 없이 parallelStream으로 돌리면 안 된다.

성능이 나빠지는 것에 그치지 않고, 응답 불가까지 발생시키는 심각한 장애와 결과에 오류가 있을 수 있다.

 

ArrayList, HashMap, HashSet, ConcurrentHashMap, 배열, int 범위, long 범위에서 효과가 가장 좋다.

이들 자료구조가 데이터를 원하는 크기로 정확하게 나눌 수 있고, 원소를 순차 실행할 때 참조 지역성이 뛰어나다.(메모리에 연속해서 저장되어 있다.)

 

파이프라인의 종단 연산이 어떤 것이냐에 따라 성능이 결정된다.

종단 연산 중 가장 parallelStream의 덕을 많이 보는 것은 reduction 작업과 그다음으로는, 조건에 맞으면 바로 반환하는 anyMatch, allMatch, noneMatch 등이 있다.

 

parallelStream은 성능 최적화 수단으로만 사용한다.

성능 최적화인지 확인하기 위해서는 그 가치를 테스트로 검증한다.

만약 직접 구현한 Stream, Iterable, Collection에 적용한다면, 그에 맞는 spliterator를 재정의하라.

 

 

 

결론

parallelStream은 정말 쉽게 stream 병렬 처리를 제공해준다. 세부 설정이나 복잡한 로직 없이 기존에 stream을 쓰듯 사용할 수 있는 편리함까지 제공해주지만, 병렬 처리 결과가 무조건 더 나은 결과를 보장한다고 볼 수는 없다.

처리 성능에 영향을 미치는 부분들, 분할 및 병합 과정에서의 비용, 멀티 스레드 환경에서의 컨텍스트 스위칭 비용 등에 대해 충분히 고려해야 하기 떄문에 신중해야 한다. 특정 로직에 대해 성능 개선을 위해 parallelStream을 적용하고자 한다면, 이것이 정말로 성능을 개선시켜줄 수 있는지, 혹 예상치 못한 장애를 발생시키지는 않는지에 대해 충분히 테스트도 진행하고 적용하는 것이 좋을 것 같다.

 

 

Reference

https://stackoverflow.com/questions/20375176/should-i-always-use-a-parallel-stream-when-possible

https://java-8-tips.readthedocs.io/en/stable/parallelization.html#conclusion

https://stackoverflow.com/questions/23170832/java-8s-streams-why-parallel-stream-is-slower

Effective Java

728x90
728x90