2021-04-08
Pythonでマルチスレッド並列処理を行う

マルチスレッドのメリット:並列化で速度改善

複数の処理を同時に実行することによって繰り返し処理全体にかかる時間を短縮することができます。

ただ、マルチスレッドを使えば計算資源を多く利用できるようになるというわけではありません。なので、計算量のある処理を並行して行うというよりは、IOやHTTP APIコールのオーバヘッドで計算していない時間が発生するような処理のほうが向いています。例えば1秒まつという処理をする関数を10回実行するとき、逐次実行していけばおよそ十秒かかりますが、10スレッドを準備してそれぞれが独立に1秒待つことで全体としてはおよそ1秒で終えることができます。

ただし特別な制御をしない限りは、並列に行われる仕事の順序を決めることはできませんので、その辺りの要求がある場合には単純な逐次実行が優れているでしょう。

マルチスレッドによって時間が早くなったかをみるための時間の測定にはtimeモジュールを使います。

import time

time_start = time.time()

処理

print(time.time() - time_start)

マルチスレッドそのものはthreadingモジュールにあるThreadクラスを活用して作成します。

もっとも単純な並列処理

もっとも単純な「1秒待つ」処理を複数スレッドを利用して並列に実行し、実行時間が実際に短縮されていることを確認します。

def wait_one_sec(i):
    time.sleep(1)

並列化の全体像は以下の通りです。最後に個々のスレッドの終了を待ちます(Thread.join)。

threads = []

for i in データ:
    thread = threading.Thread(target=データを利用した仕事, args=(i,))
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

こちらで先ほどの一秒まつ関数を仕事としてマルチスレッドありなしを試したところ以下のようになりました。予想通り本来なら10秒強かかる仕事を全体としては1秒強で完了できます。

$ python3 basic-time-measure.py 
マルチスレッドなし:  10.012403964996338
マルチスレッドあり:  1.005929708480835

一つのキューからデータを取り出していく

スレッド間である程度情報を共有してスレッド全体をある程度制御したい場合には(例えば全体のスレッド数を制限したい場合)、キューというデータ構造を利用することで実現できます。

  1. キューを準備する
  2. キューの先頭に対して処理する関数をつくる
  3. キューと仕事する関数を使ってスレッドを作成する
  4. スレッドを起動してキューにデータを追加する

仕事する関数は、データではなくキューを受け取り、関数のなかでキューからデータを取り出すようにします。

def wait_one_sec(queue_obj):
    item = queue_obj.get()
    time.sleep(1)
    queue_obj.task_done()

参考:キューのAPI

キュー構造はqueueモジュールにあるqueue.Queueによって作成できます。

  • Queue.put: キューの末尾にデータを追加
  • Queue.get: キューの先頭データを取得
    • デフォルトではキューにデータがない場合、追加されるまでブロックする
  • Queue.join: キューにあるデータがすべて取り除かれて処理が完了するまでブロックする
    • Queue.task_done: Queue.getで取り出したデータについて処理が完了したことを示す

キューを利用するマルチスレッドの全体像

スレッドの関数がキューを受け取るようになるので、受け渡すデータもキューになります。またすべての仕事を完了させたいときにはQueue.joinですべてのキュー上の要素にtask_doneが呼ばれるのを待ちます。

対象データのキュー = queue.Queue()

for i in data:
    thread = threading.Thread(target=仕事をする関数, args=(対象データのキュー,))
    thread.start()
    q.put(i)

q.join()

ここに先ほどの1秒待つ関数をあてがったもので実際に速度を計測すると、1秒ほどで全処理が完了し、十分に並列処理ができていることが分かります。

$ python3 use-queue.py 
1.0044941902160645

同時に処理するスレッド数を制限する

スレッド数が制限範囲を超えているときに、キューにある処理が完了するまでブロック(join)するとスレッド数を制限して並列処理をすることができます。もちろん無制限にスレッドを作成できたときに比べると全体としては遅くなりますが、2スレッドつかえば1スレッドだけでやった時の半分の時間で、$n$スレッドつかえば、全体としては$1/n$の時間で仕事を終えることができます。

全体像はこのようになります。すでに本体のスレッドが生成されているので、threading.active_countの返り値は自分で明示的に作成したスレッド数よりも1大きくなることに注意します。


対象データのキュー = queue.Queue(最大スレッド数)

for i in data:
    thread = threading.Thread(target=仕事をする関数, args=(対象データのキュー,))
    thread.start()
    q.put(i)

    if threading.active_count() >= 最大スレッド数 + 1:
        q.join()

同じように1秒まつメソッド10回を最大スレッド数ごとに実行した結果です。

$ python3 use-queue-maximum.py 
サイズ1:  10.020220518112183
サイズ2:  5.010011434555054
サイズ5:  2.0056521892547607

かかる時間がつかえたスレッド数で割った値になることが確認できます。

参考にさせていただいた資料

分かりやすくまとまっていてマルチスレッドの基本的な仕組みと実装を把握するのに最適です。

yohhoyさんの回答がキューを使ったマルチスレッド処理の実装を理解するのに役立ちました。