병렬 프로그래밍을 사용하면 작업을 CPU가 다룰 수 있는 단위로 쪼개서 여러 스레드에 나눠 줄 수 있다.

 

이번 장에서는 CPU를 사용하는 병렬 처리 작업만 다룬다.

 

IO 작업처럼 비동기 작업을 병렬로 실행하고자 하면 2장(2.4절) 참고하기.

 

4.1 데이터의 병렬처리

void RotateMatrices(IEnumerable<Matrix3x2> matrices)
{
    Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees));
}

Parallel 형식에선 상기와 같이 ForEach 메서드를 통해 여러 행렬을 전달 받아 모두 회전 시킨다.

 

조건에 따라 루프를 일찍 종료하고자 한다면 다음 예제와 같이 루프를 중지할 수 있다.

using System.Numerics;

void RotateMatrices(IEnumerable<Matrix3x2> matrices)
{
    Parallel.ForEach(matrices, (matrix, state) =>
    {
        if (!matrix.IsInvertible)
        {
            state.Stop();
        }
        else
        {
            matrix.Invert();
        }
    });
}

(각 행렬을 역변환하나, 역변환할 수 없는 행렬을 맞닥뜨리면 루프 중지)

 

ParallelLoopState.Stop으로 병렬 루프를 중지해서 루프 바디의 추가 호출(아직 시작되지 않은 호출)을 방지한다(더 이상 실행 안함). 하지만  이미 실행 중인 것을 중단 할 수는 없다.

만약 예제에서 세번째 행렬이 역변환할 수 없는 행렬로서 루프를 중단하더라도, 이미 네번째, 다 섯 번째 행렬이 처리 중일 수 있다. 4번째와 5번째 행렬은 처리가 되어야 한다.

 

완전한 코드

using System;
using System.Collections.Generic;
using System.Numerics;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        // 샘플 행렬 리스트
        var matrices = new List<Matrix3x2>
        {
            Matrix3x2.CreateRotation(0.5f),
            Matrix3x2.CreateScale(2.0f),
            Matrix3x2.Identity,
            new Matrix3x2(0, 0, 0, 0, 0, 0) // 역행렬 불가
        };

        var invertedResults = RotateMatrices(matrices);

        Console.WriteLine("\n=== 결과 ===");
        foreach (var m in invertedResults)
        {
            Console.WriteLine(m);
        }
    }

    static List<Matrix3x2> RotateMatrices(IEnumerable<Matrix3x2> matrices)
    {
        var results = new List<Matrix3x2>();
        object lockObj = new object();

        Parallel.ForEach(matrices, (matrix, state) =>
        {
            if (!Matrix3x2.Invert(matrix, out var inverted))
            {
                Console.WriteLine("역행렬 불가 → 병렬 중단");
                state.Stop();
            }
            else
            {
                Console.WriteLine("역행렬 계산 성공");
                lock (lockObj)
                {
                    results.Add(inverted);
                }
            }
        });

        return results;
    }
}

#출력
역행렬 계산 성공
역행렬 계산 성공
역행렬 불가 → 병렬 중단
역행렬 계산 성공

=== 결과 ===
{ {M11:0.5 M12:-0} {M21:-0 M22:0.5} {M31:0 M32:0} }
{ {M11:0.87758255 M12:-0.47942555} {M21:0.47942555 M22:0.87758255} {M31:-0 M32:0} }
{ {M11:1 M12:-0} {M21:-0 M22:1} {M31:0 M32:0} }

 

 

병렬 루프 취소는 CancellationTokenSource를 통해 취소 가능하다. 병렬 루프를 취소는 기능이 필요할 때가 많으며 루프 취소와 중지는 다르며, 루프 중지는 루프의 내부에서 일어나며, 취소는 루프의 외부에서 일어난다.

using System.Numerics;

void RotateMatrices(IEnumerable<Matrix3x2>matrices, float degrees,
    CancellationToken token)
{
    Parallel.ForEach(matrices,
        new ParallelOptions { CancellationToken = token },
        matrix => matrix.Rotate(degrees));
}

주의 해야 할 부분은 각 병렬 작업이 다른 스레드에서 실행 중일 수 있어서 모든 공유 상태를 보호해야한다.

더보기

주의해야 할 점

“각 병렬 작업이 다른 스레드에서 실행 중일 수 있어서 모든 공유 상태를 보호해야 한다.”

  • 병렬 루프는 여러 스레드가 동시에 실행되므로, 공유 자원(공유 변수, 리스트, 파일, DB 연결 등) 을 동시에 접근하면 데이터 경합(Race Condition) 이 발생할 수 있다.
  • 예를 들어, RotateMatrices 안에서 공유 리스트에 결과를 추가한다면:이 코드는 여러 스레드가 동시에 접근하기 때문에 List<T> 내부 상태가 깨질 수 있음.
  •  
    results.Add(rotatedMatrix);
  • 따라서 다음과 같은 기법을 써야 한다:
    1. lock:
 
lock(mutex)
{
    results.Add(rotatedMatrix);
}

2. 스레드 안전 컬렉션 사용 (ConcurrentBag<T>, ConcurrentQueue<T> 등).

3. 스레드 로컬 저장소 (ThreadLocal<T> 또는 Parallel.ForEach의 로컬 초기화/종료 델리게이트).


요약

  • 취소: CancellationToken 은 루프 외부에서 Cancel() 호출 → 내부는 토큰을 감시하고 OperationCanceledException 으로 중단.
  • 병렬 실행: 각 반복은 다른 스레드에서 실행되므로 공유 자원은 반드시 동기화 필요.
  • Rotate 예제는 개념 설명용이므로 실제 Matrix3x2 사용 시에는 Matrix3x2.CreateRotation 같은 팩토리 메서드 활용.

 

//주의 : 가장 효율적인 구현 방법은 아님
// 공유 상태를 보호하는 잠금(lock)의 사용법을 보여주는 예제
using System.Numerics;

int InvertMatrices(IEnumerable<Matrix3x2> matrices)
{
    object mutex = new object();
    int nonInvertibleCount = 0;
    Parallel.ForEach(matrices, matrix =>
    {
        if (matrix.IsInvertible)
        {
            matrix.Invert();
        }
        else
        {
            lock (mutex)
            {
                ++nonInvertibleCount;
            }
        }
    });
    return nonInvertibleCount;
}

상기 예제는 각 행렬을 역변환하고, 역변환할 수 없는 행렬의 수를 센다.

 

Parallel.ForEach 메서드는 일련의 값을 병렬로 처리 할 수 있으며 비슷한 병렬 처리 솔루션으로는 PLINQ(병렬 LINQ)가 있다.

 

LINQ와 비슷한 문법을 지닌 PLINQ는 Parallel과 거의 동일 기능을 제공한다. 

PLINQ와 Parrallel의 차이:

PLINQ: 컴퓨터의 모든 코어를 사용할 수 있다고 가정.

Parallel: CPU 상황에 따라 동적으로 대응.

 

Parallel.ForEach는 병렬 버전의 foreach 루프이다. Parallel 클래스는 for 루프를 병렬로 처리 할 수 있는 Parallel.For 메서드 또한 지원한다.

Parallel.For는 같은 인덱스를 사용하는 데이터 배열이 여러 개 일 때 유용하다.

 

더보기
  • Parallel.ForEach
    • List<T>, Dictionary<TKey, TValue>, IEnumerable<T> 같은 컬렉션 반복 처리할 때 자연스럽게 사용
    • 예: 행렬 리스트 순회, 파일 목록 처리, 문자열 컬렉션 가공
  • Parallel.For
    • 인덱스 기반 데이터 접근이 필요한 경우 유용
    • 예: 2개 이상의 배열을 같은 인덱스로 접근할 때
       
Parallel.For(0, array1.Length, i =>
{
    result[i] = array1[i] + array2[i];
});

정리

  • 기능적으로 **둘 다 "병렬 루프"**라서 상황에 맞게 선택하면 된다.
  • foreach 스타일 코드라면 Parallel.ForEach 가 깔끔하고,
  • 인덱스 기반 배열 연산이면 Parallel.For 가 더 직관적이고 효율적이다.

 

4.2 병렬 집계

 

병렬 작업이 끝나면 결과를 집계해야 한다. (집계 : 값의 합산, 평균값 계산 등)

 

Parallel 클래스는 병렬 루프의 범위 안에 존재하는 변수인 로컬 값의 개념을 통해 집계를 지원한다.

루프의 바디는 동기화 없이 로컬 값을 직접 사용할 수 있다는 의미다.

더보기

보통 병렬 루프에서 어떤 값을 집계(aggregation) 하고 싶을 때, 단순히 전역 변수 하나를 두고 여러 스레드에서 동시에 더하면 경쟁 조건(Race Condition) 때문에 문제가 생긴다

예:

 
 
int sum = 0;
Parallel.For(0, 1000, i =>
{
    sum += i; // 동시에 여러 스레드가 접근 → 데이터 깨짐
});

여기서 sum 은 공유 상태이기 때문에 동기화(lock 등)가 필요하다.


 로컬 값(Local state) 개념

Parallel.For 와 Parallel.ForEach 는 스레드마다 독립된 로컬 변수를 둘 수 있는 기능을 제공한다

즉:

  • 각 스레드가 자기만의 local value 를 가지고 연산 → 동기화 필요 없음
  • 모든 루프가 끝난 뒤에 최종적으로 합쳐서(aggregate) 하나의 결과를 만든다.

 코드 예제 (집계 with 로컬 값)

 
int total = 0;

Parallel.For(0, 1000,
    // 1. 로컬 초기화 (스레드별 독립 변수 초기값)
    () => 0,
    // 2. 루프 본문 (i: 반복 인덱스, state: 루프 상태, local: 스레드별 로컬 값)
    (i, state, local) =>
    {
        local += i; // 스레드 로컬 변수만 건드림 → 동기화 불필요
        return local;
    },
    // 3. 로컬 값을 최종 집계
    localResult =>
    {
        Interlocked.Add(ref total, localResult);
    });

Console.WriteLine($"합계: {total}");

 정리

  • 로컬 값(Local state) = 각 스레드가 병렬 루프 안에서 독립적으로 사용하는 변수
  • 루프 내부에서는 이 로컬 값만 사용하므로 lock 없이 안전하게 연산 가능
  • 루프가 끝난 뒤에 Interlocked 나 lock 등을 이용해서 최종 집계

 

루프는 각 로컬 결과를 집계할 준비가 끝나면 localFinally 대리자를 사용해서 집계를 수행한다.

 

localFinally 대리자가 최종 결과가 들어 있는 변수에 접근하려면 동기화가 필요하다는 점에 주의하자.

int ParallelSum(IEnumerable<int> values)
{
    object mutex = new object();
    int result = 0;
    Parallel.ForEach(source: values,
        localInit: () => 0,
        body: (item, state, localValue) => localValue + item,
        localFinally: localValue =>
        {
            lock (mutex)
            {
                result += localValue;
            }
        });
    return result;
}

 

PLINQ는 Parallel 클래스보다 더 자연스러운 집계를 지원한다.

int ParallelSum(IEnumerable<int> values)
{
    return values.AsParallel().Sum();
}

 

PLINQ는 Sum 처럼 집계에 많이 쓰이는 연산자를 기본적으로 지원하기 때문에 아주 간단하게 작성했다.

 

PLINQ는 Aggregate 연산자 통해 제네릭의 집계도 지원한다.

int ParallelSum(IEnumerable<int> values)
{
    return values.AsParallel().Aggregate(
        seed: 0,
        func: (sum, item) => sum + item);
}

 

Parallel 클래스를 사용 중인게 아니라면 PLINQ의 장점인 집계기능 표현력 좋은 점 그리고 간결한 코드를 이점으로 PLINQ 병렬 처리를 사용 해보자. (4.5절 PLINQ 기초 설명)

 

 

4.3 병렬 호출

void ProcessAray(double[] array)
{
    Parallel.Invoke(
        () => ProcessPartialArray(array, 0, array.Length / 2),
        () => ProcessPartialArray(array, array.Length / 2, array.Length)
        );
}

void ProcessPartialArray(double[] array, int start, int end)
{
    for (int i = start; i < end; i++)
    {
        array[i] = Math.Sqrt(array[i]); // cpu 집약적인 처리
    }
}

 

서로 독립적인 여러 메서드를 병렬로 호출하려 하며, Parallel 클래스에는 Invoke 멤버 메서드를 통해 해결 할 수 있으며 예제는 배열을 반으로 나눈 뒤 각 반쪽을 독립적으로 처리하는 예제다.

 

실제 실행해 보기 전까지 호출 횟수를 알 수 없다면 다음과 같이 Parallel.Invoke 메서드로 대리자의 배열을 전달 할 수 있다.

void DoAction20Times(Action action)
{
    Action[] actions = Enumerable.Repeat(action, 20).ToArray();
    Parallel.Invoke(actions);
}

 

Parallel.Invoke는 간단한 병렬 호출에 사용 할 수 있지만 완벽한 해결책은 아니다.

입력 데이터의 각 항목에 작업을 적용하려면 Parallel.ForEach를 사용해야 하고, 각 작업이 일종의 출력을 생성한다면 PLINQ를 사용해야 한다.

 

4.4 동적 병렬 처리

TPL(작업 병렬 라이브러리)의 핵심은 Task 형식이다. Parallel 클래스와 PLINQ는 강력한 Task 형식을 편리하게 쓸 수 있게 감싼 래퍼일 뿐이다.

동적 병렬 처리가 필요하다면 Task 형식을 직접 사용하는게 가장 쉽다.

 

이진트리의 구조는 실행 전에는 알 수 없어서 동적 병렬 처리 예제로 적당하다.

아래 예제에서는 부모 노드를 먼저 처리 한 뒤 자식 노드를 처리한다고 가정한다.  

Tranverse 메서드는 현재 노드를 처리하고 노드 아래에 있는 브랜치 마다 하나씩, 총 2개의 하위 작업을 생성한다.

ProcessTree 메서드는 최상위 레벨의 부모 작업을 생성하고 작업의 완료를 기다리는 방식으로 처리를 시작한다.

 

void Traverse(Node current)
{
    DoExpensiveActionOnNode(current);
    if (current.Left != null)
    {
        Task.Factory.StartNew(
            () => Traverse(current.Left),
        CancellationToken.None,
        TaskCreationOptions.AttachedToParent,
        TaskScheduler.Default);
    }
    if (current.Right != null)
    {
        Task.Factory.StartNew(
             () => Traverse(current.Right),
        CancellationToken.None,
        TaskCreationOptions.AttachedToParent,
        TaskScheduler.Default);
    }
}

void ProcessTree(Node root)
{
    Task task = Task.Factory.StartNew(
             () => Traverse(root),
        CancellationToken.None,
        TaskCreationOptions.None,
        TaskScheduler.Default);
    task.Wait();
}

AttatchedToParent 플래그는 각 브랜치의 Task와 부모 노드의 Task를 연결 했음을 나타낸다. 이렇게 하여 Task 인스턴스 사이에 트리 노드의 부모 자식 관계와 동일한 부모 자식 관계를 만든다. 부모 작업은 자신의 대리자를 실행한 뒤에 자식 작업의 완료를 기다린다. 자식 작업에서 발생한 예외는 자식 작업에서 부모 작업으로 전파된다. 따라서 ProcessTree는 루트 노드의  Task에서 Wait를 한 번 호출하여 모든 트리의 작업을 기다릴 수 있다.

 

더보기

TaskCreationOptions.AttachedToParent를 사용하면 트리 탐색의 부모–자식 구조와 동일하게 Task 간에도 부모–자식 관계가 맺어진다.

즉:

  • 부모 Task가 실행되면서 자식 Task를 AttachedToParent 옵션으로 생성하면,
  • 부모 Task는 자신의 delegate 실행이 끝난 후에도 자식 Task들이 모두 끝날 때까지 완료 상태로 바뀌지 않는다.
  • 자식 Task에서 발생한 예외는 부모 Task로 전파된다.

코드 동작 흐름

  1. ProcessTree
    • 루트 노드에서 시작하는 Task 하나를 만든다.
    • task.Wait()를 호출 → 루트 Task가 끝날 때까지 블록.
  2. Traverse(Node current)
    • 현재 노드에서 DoExpensiveActionOnNode 실행.
    • Left/Right가 있으면 Task.Factory.StartNew로 각각 자식 노드 탐색 Task를 생성.
    • 이때 AttachedToParent 옵션 때문에 자식 Task가 부모 Task에 붙는다.
  3. Task 계층 구조
    • 루트 Task → Left/Right 자식 Task → 그 하위 Left/Right … 식으로, 실제 Tree와 동일한 Task 트리 구조가 생긴다.
    • ProcessTree에서 task.Wait()를 하면 모든 하위 Task까지 완료될 때까지 기다리는 효과가 난다.

AttachedToParent 없는 경우와의 차이

  • 없는 경우:
    • 부모 Task는 자식 Task를 만들고 바로 끝난다.
    • task.Wait()는 루트 Task만 기다릴 뿐, 자식들이 다 끝나기 전에 리턴할 수 있다.
    • 예외도 부모로 전파되지 않고 TaskScheduler.UnobservedTaskException으로 흘러갈 수 있다.
  • 있는 경우:
    • 부모 Task는 자식 Task들이 끝날 때까지 완료되지 않는다.
    • 따라서 task.Wait() 한 번이면 전체 트리 탐색이 끝날 때까지 안전하게 기다릴 수 있다.
    • 예외도 부모 Task로 모아진다.

요약

  • AttachedToParent 덕분에 트리 구조 = Task 구조가 된다.
  • ProcessTree에서 루트 Task 하나만 기다려도 전체 트리가 처리된다.
  • 예외 처리도 부모로 전파되어 관리가 쉽다.

 

부모 자식등의 관계가 없는 상황이면 연속 작업(continuation)을 사용하여 다른 작업 뒤에 실행 할 작업을 예약 할 수 있다.

연속작업이란 원래 작업을 완료 했을 때 실행하는 별도의 작업이다.

using System.Diagnostics;

Task task = Task.Factory.StartNew(
    () =>
     Thread.Sleep(2000), // Simulate some work
        CancellationToken.None,
        TaskCreationOptions.None,
        TaskScheduler.Default
        );
Task continuation = task.ContinueWith(
    t => Trace.WriteLine("Task completed"),
    CancellationToken.None,
    TaskContinuationOptions.None,
    TaskScheduler.Default);

 

StartNew와 ContinueWith이 사용할 TaskScheduler는 명시적으로 지정하는 편이 좋다.

 

작업을 부모 자식관계로 정리하는 방법은 이와 같이 필수는 아니나, 동적 병렬 처리에서 흔히 쓰이는 방법이다.

스레드로부터 안전한(thread-safe) 컬렉션에 새로운 작업을 모두 저장한 뒤에 TaskWaitAll을 사용해 모든 작업이 완료할 때까지 기다릴 수 있다.

# 병렬 처리에서의 Task 사용법과 비동기 처리에서의 Task 사용법은 완전히 다름

동시성 프로그래밍에서 Task 형식의 목적

1) 병렬 작업 : Task.Wait, Task.Result, Task.WaitAll, Task.WaitAny 처럼 작업을 차단하는 멤버 메서드를 사용할 수 있다.

AttachedToParent를 사용해 작업 사이에 부모 - 자식 관계를 만들 수 있다.

병렬 작업은 Task.Run이나 Task.Factory.StartNew로 생성해야 한다.

 

2) 비동기 작업: 작업을 차단하는 멤버 메서드를 사용하지 말고, Await, Task.WhenAll(), Task.WhenAny를 사용한다.

비동기 작업은 AttatchedToParent를 사용할 수 없으나, 다른 작업을 대기하는 방식으로 일종의 암시적인 부모 - 자식 관계를 만들 수 있다.

 

4.5 PLINQ

PLINQ는 병렬 처리를 지원할 수 있게 확장한 LINQ(풀 기반, 일련의 데이터를 처리) 다.

 

PLINQ는 입력 시퀀스로 출력 시퀀스를 만들어 내는 스트리밍에 잘 맞는다.

IEnumerable<int> MultiplyBy2(IEnumerable<int> values)
{
    return values.AsParallel().Select(value => value * 2);
}

 

상기 예제 결과는 순서와 상관없이 만들어 질 수 있다. 그게 PLINQ의 기본 동작 방식이다. 

 

결과가 만들어지는 순서를 다음 예제 처럼 AsOrdered 메서드로 지정 가능하다.

IEnumerable<int> MultiplyBy2(IEnumerable<int> values)
{
    return values.AsParallel().AsOrdered().Select(value => value * 2);
}

 

병렬로 합계를 구하는 예제는 다음과 같다.

int ParallelSum(IEnumerable<int> values)
{
    return values.AsParallel().Sum();
}

 

 

 

집계 수행 또는 시퀀스를 다른 시퀀스로 변환하려면 Parallel 보단 PLINQ 코드가 더 간단하다.

Parallel 클래스는 PLINQ보단 시스템의 다른 프로세스에 더 친화적이다.

특히 서버에서 병렬 처리를 수행해야한다면 Parallel 클래스를 사용하자,.

더보기

Parallel vs PLINQ 차이

1. PLINQ (Parallel LINQ)

  • LINQ 쿼리를 병렬화한 것.
  • 데이터 집계, 변환, 필터링 같은 “쿼리형 데이터 처리”에 강점.
  • 코드가 간결:  Parallel.ForEach로 같은 걸 하려면 로컬 집계 + 최종 병합 코드 필요.
 
var result = numbers.AsParallel()
                    .Where(n => n % 2 == 0)
                    .Select(n => n * n)
                    .Sum();
  • 단점: 내부적으로 스레드 관리가 추상화되어 있어 세밀한 제어가 어렵다.

2. Parallel 클래스 (Parallel.For, Parallel.ForEach)

  • 루프 기반 병렬 처리를 직접 제어할 수 있음.
  • I/O, CPU 바운드 연산, 단순 반복 최적화 같은 시나리오에 강함.
  • PLINQ보다 시스템 자원(스레드 풀, 다른 프로세스) 에 대한 배려가 더 큼 → “친화적이다”라는 말은 이걸 뜻함.
  • 예: 서버 환경에서 다른 서비스들과 같이 돌 때, PLINQ보다 리소스 경합을 덜 일으키는 경향.

왜 서버에서는 Parallel이 권장될까?

  • 서버 환경 = 단일 프로세스만 도는 게 아니라 여러 서비스/프로세스가 동시에 자원을 나눠 씀.
  • Parallel 클래스는 Task Parallel Library (TPL) 기반으로, 스레드 풀을 활용하면서 적절히 work stealing, 로드 밸런싱을 해서 CPU를 효율적으로 씀.
  • PLINQ는 데이터 질의에 최적화되어 있어서, 고성능 데이터 처리에는 좋지만 서버 자원 친화성 측면에서는 Parallel보다 “욕심을 부릴 수” 있음.

 

 정리

  • 데이터 변환/집계 → PLINQ (AsParallel) 쓰면 코드가 더 짧고 가독성 ↑
  • 범용 반복/병렬 처리 (특히 서버 사이드) → Parallel.For, Parallel.ForEach 권장
  • PLINQ = 코드 간단, 데이터 질의 지향
  • Parallel = 자원 친화적, 시스템 제어 지향

 

 

PLINQ는 Where, Select, Sum, Average, 제네릭인 Aggregate 같은 집계까지 포함해서 다양한 연산자의 병렬 버전을 제공한다.

병렬로 실행하면 더 좋을 수 있는 LINQ코드는 PLINQ를 사용하자.

 

+ Recent posts