tkm2261's blog

研究員(OR屋) → データ分析官 → MLエンジニア → ニートがデータ分析諸々書いてます

道具としてのCython

皆様tkm2261です。今日は道具としてのCythonと題して、

使うことに特化してCythonの解説をしたいと思います。

きっかけはKaggle

最近まで参加していたInstacart Market Basket Analysis | Kaggle

どうしても高速化したい処理があり実装しました。

Numbaも良いのですが、速くなるときとならないときがあり、JITよりもコンパイルが速いのは自明なのでCythonにしました。

Cythonを使うとき

Cythonはコードの可読性を下げてしまうので、下記のような事を満たすケースか考えた方が良いです。

  • 処理が切り出せる
  • numpy配列の処理(C言語で書けって言われたら書けるレベル)
  • numpyの関数では表現できない。(配列をfor文でアクセスしないといけない)
  • 呼び出し回数が数千回以上

今回のKaggleではこれが当てはまったので紹介します。

FaronさんのF1最適化

このコンペではユーザ毎にアイテムを推薦してそのF1値の平均が指標でした。

つまり、予測値に対して閾値を設けて0-1にする必要があります。

Faronさんは「Ye, N., Chai, K., Lee, W., and Chieu, H. Optimizing F-measures: A Tale of Two Approaches. In ICML, 2012.」のアルゴリズムを実装してカーネルとして公開してくれました。

F1-Score Expectation Maximization in O(n²) | Kaggle

これが絶大な威力を発揮して、コンペはこれの公開後から別の様相を呈しました。

DP (Dynamic Programming)を含んだ実装はCythonの出番

このFaronさんの実装を見てもらえばすぐわかりますが、

いつくかDPテーブルを確保して、for文を回しています。これは超Cython向きです。

使い方 その1 『Cython実装』

実装自体はこの記事最後に付けたので参照下さい。ここではPythonとの相違点をメインに話します。

ファイルは.pyx

とくに考えずに、.pyではなく.pyxとしましょう。

cimport

CythonやNumpyのCライブラリに直接アクセスするためにcimportします

cimport cython
cimport numpy as np

ライブラリやヘッダにパスを通す必要がありますが、それは後ほど

型宣言

cdefを付けて使う前に宣言してください。

cdef int i, j, k

一番上でi,j,kをintで宣言するのは競プロっぽいですね。

実際、競プロ+CythonをやればPythonコードをかなり速く出来ると思います。

配列の場合は次元数と型を宣言してください。

cdef np.ndarray[double, ndim = 2] DP_C
cdef np.ndarray[long, ndim= 1] idx

np.float64で渡すと、Cython側ではlong型になるので注意しましょう。メモリにうるさい場合は呼び出しの方でも小さい整数型を使って下さい。

Cythonでは配列の次元、型指定が速度のキーとなるので必ず指定しましょう。

これやらないと使う意味ないです。

オプション指定

Pythonでは配列の長さを超えてアクセスするとエラーになったり、-1でアクセスすると一番後ろが取れたり便利な機能があります。

裏を返せば、そこのチェックに時間を使っているので、これを切ると高速になります。

@cython.boundscheck(False)
@cython.wraparound(False)

ただしC言語同様にSegmentation Faultを吐くようになります。

普通のPythonも書ける

今回はユーザ分並列したかったのでmultiprocessingも呼んでいます。

付録の感じで普通のPythonコードもpyxファイル内で呼ぶことが出来ます。

ただし、高速化は余りされないので注意しましょう。

※ CythonでGIL外してマルチスレッド処理も可能ですが、超面倒なので極力避けましょう

このf1_groupはlightgbmの評価関数として使える形になっており、これを使ってコンペではチューニングしていますた。

使い方 その2『コンパイル

ここではsetup.pyを使うコンパイルを紹介します。

pyximport使うのもありますが、呼び出し元でcythonを意識したくないのでsetup.pyで.soを作ります。

コンパイルするpyxファイルはutils.pyxとします。

setup.pyの書き方

from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext
import numpy

setup(
    cmdclass={'build_ext': build_ext},
    ext_modules=[Extension("utils", ["utils.pyx"])],
    include_dirs=[numpy.get_include()],
    extra_compile_args=['-fopenmp'],
    extra_link_args=['-fopenmp'],
)

numpy.get_include()で動的に持ってくるのがミソです。

コンパイルの実行

下記を実行してすると、

python setup.py build_ext --inplace

というファイルが作成されます。これを実行したいフォルダに持っていきます。

使い方 その3『呼び出し』

作成されたsoファイルを実行したいフォルダに持ってきて普通にimportできます

from utils import f1_opt

soファイルのドット前の名前でimport出来ます。

速度検証

実際に1万人に対して並列せずに実行して速度を見てみます。

計算時間
Python実装 241.1秒
Cython実装 21.5秒

約12倍の高速化になっています。

使いすぎには注意ですが、Cythonは非常に強力なツールです。

付録: 実装 utils.pyx

スネークケースとかごっちゃになってますがコピペして持ってきたのですみません。。。

cimport cython
import numpy as np
cimport numpy as np
from sklearn.metrics import f1_score


@cython.boundscheck(False)
@cython.wraparound(False)
def f1_opt(np.ndarray[long, ndim=1] label, np.ndarray[double, ndim=1] preds):
    """ ユーザ毎の期待F1値最大化&その時の実際のF1値取得関数

    :param label np.ndarray: 0-1の正解ラベル 
    :param preds np.ndarray: 予測確率ラベル 
    """

    cdef int i, j, k, k1
    cdef double f1, score, f1None, pNone
    cdef long n = preds.shape[0]

    pNone = (1 - preds).prod()

    cdef np.ndarray[long, ndim= 1] idx = np.argsort(preds)[::-1]
    label = label[idx]
    preds = preds[idx]

    cdef np.ndarray[double, ndim = 2] DP_C = np.zeros((n + 2, n + 1), dtype=np.float)

    DP_C[0, 0] = 1.0
    for j in range(1, n):
        DP_C[0, j] = (1.0 - preds[j - 1]) * DP_C[0, j - 1]
    for i in range(1, n + 1):
        DP_C[i, i] = DP_C[i - 1, i - 1] * preds[i - 1]
        for j in range(i + 1, n + 1):
            DP_C[i, j] = preds[j - 1] * DP_C[i - 1, j - 1] + (1.0 - preds[j - 1]) * DP_C[i, j - 1]

    cdef np.ndarray[double, ndim = 1] DP_S = np.zeros((2 * n + 1,))
    cdef np.ndarray[double, ndim = 1] DP_SNone = np.zeros((2 * n + 1,))
    for i in range(1, 2 * n + 1):
        DP_S[i] = 1. / (1. * i)
        DP_SNone[i] = 1. / (1. * i + 1)

    score = -1
    cdef np.ndarray[double, ndim= 1] expectations = np.zeros(n + 1)
    cdef np.ndarray[double, ndim= 1] expectationsNone = np.zeros(n + 1)

    for k in range(n + 1)[::-1]:
        f1 = 0
        f1None = 0
        for k1 in range(n + 1):
            f1 += 2 * k1 * DP_C[k1][k] * DP_S[k + k1]
            f1None += 2 * k1 * DP_C[k1][k] * DP_SNone[k + k1]
        for i in range(1, 2 * k - 1):
            DP_S[i] = (1 - preds[k - 1]) * DP_S[i] + preds[k - 1] * DP_S[i + 1]
            DP_SNone[i] = (1 - preds[k - 1]) * DP_SNone[i] + preds[k - 1] * DP_SNone[i + 1]
        expectations[k] = f1
        expectationsNone[k] = f1None + 2 * pNone / (2 + k)

    if expectations.max() > expectationsNone.max():
        i = np.argsort(expectations)[n] - 1
        tp = label[:i + 1].sum()
        if tp > 0:
            precision = tp / (i + 1)
            recall = tp / label.sum()
            f1 = (2 * precision * recall) / (precision + recall)
        else:
            f1 = 0
    else:
        i = np.argsort(expectationsNone)[n] - 1
        tp = label[:i + 1].sum() if label.sum() != 0 else 1
        if tp > 0:
            precision = tp / (i + 2)
            recall = tp / max(label.sum(), 1)
            f1 = (2 * precision * recall) / (precision + recall)
        else:
            f1 = 0

    return f1

from multiprocessing import Pool


@cython.boundscheck(False)
@cython.wraparound(False)
def f1_group(np.ndarray[long, ndim=1] label, np.ndarray[double, ndim=1] preds, np.ndarray[long, ndim=1] group):
    """ ユーザ平均F1値取得関数

    :param np.ndarray label: 0-1の正解ラベル 
    :param np.ndarray preds: 予測確率ラベル
    :param  np.ndarray group: ユーザの切れ目(lightgbmのgroup定義に準拠)
    """
    cdef int i, start, end, j, s
    cdef double score = 0.
    cdef long m = group.shape[0]
    cdef long n = preds.shape[0]
    start = 0

    p = Pool()
    list_p = []
    for i in range(m):
        end = start + group[i]
        list_p.append(p.apply_async(f1_opt, (label[start:end], preds[start:end],)))
        start = end
    scores = [a.get() for a in list_p]
    p.close()
    p.join()
    return np.mean(scores)