tkm2261's blog

研究員(OR屋) → データ分析官 → MLエンジニア → ニート → 米国CS PhDが諸々書いてます

Kaggler Slack作りました

追記 2023/03/17

また変わっていました。

追記 2022/10/23

また変わっていました。

追記 2021/03/11

Slack側の仕様変更があり招待URLが期限切れになってました。

現在は下記のURLから入れますが、もし万が一期限切れになっていましたらTwitterのDMでお知らせ頂けると助かります。

join.slack.com

Kaggler-ja slack

Twitterでは拡散しましたが、検索で来る人もいそうなので宣伝

https://kaggler-ja.herokuapp.com/

オープンチャンネルなのでメール入れれば誰でも入れます

誰もチェックとかしてないので気軽にどうぞー

細かすぎて伝わらないLightGBM活用法 (callback関数)

皆様tkm2261です。この頃連投が続いてますが、

最近まで参加していたInstacart Market Basket Analysis | Kaggleで色々やったので残しておこうと思います。

このcallback関数は便利ですが、Kaggleなどでヘビーに使う人以外ここまでしないと思うので活躍するかは微妙なところです。。。

LightGBMのtrain関数を読み解く

xgboostもそうですが、lightgbmにもtrain()という関数がありLightGBMユーザはこれを使って学習を実行します。

scikit-learn APIも内部ではこの関数を呼んでいるので同じです。

この引数にcallbacksというのがあり、殆どのユーザは使っていないと思います。(私も今回初)

このcallbacksの活用法が今回のトピックになります。

train関数の実装を見てみると, 192行目ぐらいでBoostingを回しています。

LightGBM/engine.py at master · Microsoft/LightGBM · GitHub

# lightgbm/engine.py: 192
    for i in range_(init_iteration, init_iteration + num_boost_round):
        for cb in callbacks_before_iter:
            cb(callback.CallbackEnv(model=booster,
                                    params=params,
                                    iteration=i,
                                    begin_iteration=init_iteration,
                                    end_iteration=init_iteration + num_boost_round,
                                    evaluation_result_list=None))

        booster.update(fobj=fobj)

        evaluation_result_list = []
        # check evaluation result.
        if valid_sets is not None:
            if is_valid_contain_train:
                evaluation_result_list.extend(booster.eval_train(feval))
            evaluation_result_list.extend(booster.eval_valid(feval))
        try:
            for cb in callbacks_after_iter:
                cb(callback.CallbackEnv(model=booster,
                                        params=params,
                                        iteration=i,
                                        begin_iteration=init_iteration,
                                        end_iteration=init_iteration + num_boost_round,
                                        evaluation_result_list=evaluation_result_list))
        except callback.EarlyStopException as earlyStopException:
            booster.best_iteration = earlyStopException.best_iteration + 1
            evaluation_result_list = earlyStopException.best_score
            break

意外に思われるかもしれませんが、boostingのfor文はpython側で回ってます。

early stoppingもこっちで例外として書いてあるのでcallback関数さえかければループ毎にかなり動的に書くことが出来ます。

例えばearly stoppingの条件をより複雑にしたり、loggingをちゃんと仕込んで普通は標準出力に吐かれて保存が難しい学習の様子をファイルに残すことが出来ます。

train関数に渡したcallback関数はcallbacks_after_iterに渡るのでこちらをこれから見ていきます。

callback関数の仕様

custom objectiveやcustom metricと異なりcallbackの仕様は英語でもドキュメントがありません。

ただ実装は素直なので見ていきます。

文字で説明してもアレなので、先に私のコンペの実装を見せます。

def get_pred_metric(pred, dtrain):
    """予測値無理やり取るmetric
    """
    return 'pred', pred, True

def callback(env):
    """ligthgbm callback関数(instacartコンペ)
   
    :param lightgbm.callback.CallbackEnv env: 学習中のデータ
    """

    # 10回毎に実行
    if (env.iteration + 1) % 10 != 0:
        return

    clf = env.model                 # 学習中モデル
    trn_env = clf.train_set         # 学習データ (ラベルだけが入ってる※後述)
    val_env = clf.valid_sets[0]     # 検証データ (ラベルだけが入ってる※後述)

    # 無理やり予測値をとる
    preds = [ele[2] for ele in clf.eval_train(get_pred_metric) if ele[1] == 'pred'][0]
    # ラベル取得Cython用に型を指定
    labels = trn_env.get_label().astype(np.int)

    # ユーザ毎のしきい値で予測を0-1にしてデータ毎に正解なら1, 不正解なら0を返す関数
    # (list_idxはgroupでグローバルでアクセス)
    res = f1_group_idx(labels, preds, list_idx).astype(np.bool)
   
    # 間違ってるデータのウェイトを上げるて正解のデータのウェイトを下げる
    weight = trn_env.get_weight()
    if weight is None:
        weight = np.ones(preds.shape[0])
    weight[res] *= 0.8
    weight[~res] *= 1.25

    trn_env.set_weight(weight)

    # 検証データのmetricを計算
    preds = [ele[2] for ele in clf.eval_valid(get_pred_metric) if ele[1] == 'pred'][0]
    labels = val_env.get_label().astype(np.int)
    t = time.time()
    res = f1_group(labels, preds, list_idx)
    sc = np.mean(res)

    logger.info('cal [{}] {} {}'.format(env.iteration + 1, sc, time.time() - t))

受け取る引数はひとつ (lightgbm.callback.CallbackEnv)

ただの名前付きタプルです。lightgbmユーザなら名前で大体中身が分かるはずです。

唯一よく知らないのがmodel(Booster)かと思うので、この後見ていきます。

begin_iterationとend_iterationがあるのはinit_model指定して学習を途中からやる場合があるからです。

CallbackEnv = collections.namedtuple(
    "LightGBMCallbackEnv",
    ["model",
     "params",
     "iteration",
     "begin_iteration",
     "end_iteration",
     "evaluation_result_list"])

LightGBM/callback.py at master · Microsoft/LightGBM · GitHub

データのアクセスはAPI経由

callback関数最大の難関は、lightgbmの学習データをpythonから直接アクセス出来ないことです。

なぜかというと、学習はCでやるためCのオブジェクトでデータを持ってるためです。

メモリ的にPython側でも持つわけには行かないので仕方ありません。

もっと良いやり方があるかもしれませんが、学習データにはcustom metricでアクセスするのが一番楽そうでした。

なのでget_pred_metricみたいな一切計算しないで予測値を返すだけのcustom metricを実装して無理やり取得します。

preds = [ele[2] for ele in clf.eval_train(get_pred_metric) if ele[1] == 'pred'][0]
preds = [ele[2] for ele in clf.eval_valid(get_pred_metric) if ele[1] == 'pred'][0]

Booster.eval_trainとBooster.eval_testで学習データと検証データのそれぞれの予測値がとれます。

データは複数渡せるので、最後に先頭のだけ取ります。

ラベルは普通にget_label()でとれます。

その他の事はBoosterの実装を見てください。

LightGBM/basic.py at master · Microsoft/LightGBM · GitHub

あとは好きに実装

私の場合は、metricの計算が遅いので10回反復毎にしたり、反復毎にデータのウェイトを変えたり、ログに書いたりといった活用をしました。

ただ反復毎にデータのウェイトを変えるのはC側の変更も必要なので注意下さい。

自前early stoppingのやり方

↑のboosting反復の実装みれば分かる通り、lightgbm.callback.EarlyStopExceptionを上げるだけです。

callback関数内で好きに実装してraiseしましょう。

class EarlyStopException(Exception):
    """Exception of early stopping.
    Parameters
    ----------
    best_iteration : int
        The best iteration stopped.
    """
    def __init__(self, best_iteration, best_score):
        super(EarlyStopException, self).__init__()
        self.best_iteration = best_iteration
        self.best_score = best_score

LightGBM/callback.py at master · Microsoft/LightGBM · GitHub

ただ未検証なので、ちゃんと巻き戻るかとか検証したら教えて下さい。。。

一応書いたけど。。。

ここまで学習を制御すること無いので普通使わないですが、忘れそうなので備忘で記事にしました。

道具としてのCython

皆様tkm2261です。今日は道具としてのCythonと題して、

使うことに特化してCythonの解説をしたいと思います。

きっかけはKaggle

最近まで参加していたInstacart Market Basket Analysis | Kaggle

どうしても高速化したい処理があり実装しました。

Numbaも良いのですが、速くなるときとならないときがあり、JITよりもコンパイルが速いのは自明なのでCythonにしました。

Cythonを使うとき

Cythonはコードの可読性を下げてしまうので、下記のような事を満たすケースか考えた方が良いです。

  • 処理が切り出せる
  • numpy配列の処理(C言語で書けって言われたら書けるレベル)
  • numpyの関数では表現できない。(配列をfor文でアクセスしないといけない)
  • 呼び出し回数が数千回以上

今回のKaggleではこれが当てはまったので紹介します。

FaronさんのF1最適化

このコンペではユーザ毎にアイテムを推薦してそのF1値の平均が指標でした。

つまり、予測値に対して閾値を設けて0-1にする必要があります。

Faronさんは「Ye, N., Chai, K., Lee, W., and Chieu, H. Optimizing F-measures: A Tale of Two Approaches. In ICML, 2012.」のアルゴリズムを実装してカーネルとして公開してくれました。

F1-Score Expectation Maximization in O(n²) | Kaggle

これが絶大な威力を発揮して、コンペはこれの公開後から別の様相を呈しました。

DP (Dynamic Programming)を含んだ実装はCythonの出番

このFaronさんの実装を見てもらえばすぐわかりますが、

いつくかDPテーブルを確保して、for文を回しています。これは超Cython向きです。

使い方 その1 『Cython実装』

実装自体はこの記事最後に付けたので参照下さい。ここではPythonとの相違点をメインに話します。

ファイルは.pyx

とくに考えずに、.pyではなく.pyxとしましょう。

cimport

CythonやNumpyのCライブラリに直接アクセスするためにcimportします

cimport cython
cimport numpy as np

ライブラリやヘッダにパスを通す必要がありますが、それは後ほど

型宣言

cdefを付けて使う前に宣言してください。

cdef int i, j, k

一番上でi,j,kをintで宣言するのは競プロっぽいですね。

実際、競プロ+CythonをやればPythonコードをかなり速く出来ると思います。

配列の場合は次元数と型を宣言してください。

cdef np.ndarray[double, ndim = 2] DP_C
cdef np.ndarray[long, ndim= 1] idx

np.float64で渡すと、Cython側ではlong型になるので注意しましょう。メモリにうるさい場合は呼び出しの方でも小さい整数型を使って下さい。

Cythonでは配列の次元、型指定が速度のキーとなるので必ず指定しましょう。

これやらないと使う意味ないです。

オプション指定

Pythonでは配列の長さを超えてアクセスするとエラーになったり、-1でアクセスすると一番後ろが取れたり便利な機能があります。

裏を返せば、そこのチェックに時間を使っているので、これを切ると高速になります。

@cython.boundscheck(False)
@cython.wraparound(False)

ただしC言語同様にSegmentation Faultを吐くようになります。

普通のPythonも書ける

今回はユーザ分並列したかったのでmultiprocessingも呼んでいます。

付録の感じで普通のPythonコードもpyxファイル内で呼ぶことが出来ます。

ただし、高速化は余りされないので注意しましょう。

※ CythonでGIL外してマルチスレッド処理も可能ですが、超面倒なので極力避けましょう

このf1_groupはlightgbmの評価関数として使える形になっており、これを使ってコンペではチューニングしていますた。

使い方 その2『コンパイル

ここではsetup.pyを使うコンパイルを紹介します。

pyximport使うのもありますが、呼び出し元でcythonを意識したくないのでsetup.pyで.soを作ります。

コンパイルするpyxファイルはutils.pyxとします。

setup.pyの書き方

from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext
import numpy

setup(
    cmdclass={'build_ext': build_ext},
    ext_modules=[Extension("utils", ["utils.pyx"])],
    include_dirs=[numpy.get_include()],
    extra_compile_args=['-fopenmp'],
    extra_link_args=['-fopenmp'],
)

numpy.get_include()で動的に持ってくるのがミソです。

コンパイルの実行

下記を実行してすると、

python setup.py build_ext --inplace

というファイルが作成されます。これを実行したいフォルダに持っていきます。

使い方 その3『呼び出し』

作成されたsoファイルを実行したいフォルダに持ってきて普通にimportできます

from utils import f1_opt

soファイルのドット前の名前でimport出来ます。

速度検証

実際に1万人に対して並列せずに実行して速度を見てみます。

計算時間
Python実装 241.1秒
Cython実装 21.5秒

約12倍の高速化になっています。

使いすぎには注意ですが、Cythonは非常に強力なツールです。

付録: 実装 utils.pyx

スネークケースとかごっちゃになってますがコピペして持ってきたのですみません。。。

cimport cython
import numpy as np
cimport numpy as np
from sklearn.metrics import f1_score


@cython.boundscheck(False)
@cython.wraparound(False)
def f1_opt(np.ndarray[long, ndim=1] label, np.ndarray[double, ndim=1] preds):
    """ ユーザ毎の期待F1値最大化&その時の実際のF1値取得関数

    :param label np.ndarray: 0-1の正解ラベル 
    :param preds np.ndarray: 予測確率ラベル 
    """

    cdef int i, j, k, k1
    cdef double f1, score, f1None, pNone
    cdef long n = preds.shape[0]

    pNone = (1 - preds).prod()

    cdef np.ndarray[long, ndim= 1] idx = np.argsort(preds)[::-1]
    label = label[idx]
    preds = preds[idx]

    cdef np.ndarray[double, ndim = 2] DP_C = np.zeros((n + 2, n + 1), dtype=np.float)

    DP_C[0, 0] = 1.0
    for j in range(1, n):
        DP_C[0, j] = (1.0 - preds[j - 1]) * DP_C[0, j - 1]
    for i in range(1, n + 1):
        DP_C[i, i] = DP_C[i - 1, i - 1] * preds[i - 1]
        for j in range(i + 1, n + 1):
            DP_C[i, j] = preds[j - 1] * DP_C[i - 1, j - 1] + (1.0 - preds[j - 1]) * DP_C[i, j - 1]

    cdef np.ndarray[double, ndim = 1] DP_S = np.zeros((2 * n + 1,))
    cdef np.ndarray[double, ndim = 1] DP_SNone = np.zeros((2 * n + 1,))
    for i in range(1, 2 * n + 1):
        DP_S[i] = 1. / (1. * i)
        DP_SNone[i] = 1. / (1. * i + 1)

    score = -1
    cdef np.ndarray[double, ndim= 1] expectations = np.zeros(n + 1)
    cdef np.ndarray[double, ndim= 1] expectationsNone = np.zeros(n + 1)

    for k in range(n + 1)[::-1]:
        f1 = 0
        f1None = 0
        for k1 in range(n + 1):
            f1 += 2 * k1 * DP_C[k1][k] * DP_S[k + k1]
            f1None += 2 * k1 * DP_C[k1][k] * DP_SNone[k + k1]
        for i in range(1, 2 * k - 1):
            DP_S[i] = (1 - preds[k - 1]) * DP_S[i] + preds[k - 1] * DP_S[i + 1]
            DP_SNone[i] = (1 - preds[k - 1]) * DP_SNone[i] + preds[k - 1] * DP_SNone[i + 1]
        expectations[k] = f1
        expectationsNone[k] = f1None + 2 * pNone / (2 + k)

    if expectations.max() > expectationsNone.max():
        i = np.argsort(expectations)[n] - 1
        tp = label[:i + 1].sum()
        if tp > 0:
            precision = tp / (i + 1)
            recall = tp / label.sum()
            f1 = (2 * precision * recall) / (precision + recall)
        else:
            f1 = 0
    else:
        i = np.argsort(expectationsNone)[n] - 1
        tp = label[:i + 1].sum() if label.sum() != 0 else 1
        if tp > 0:
            precision = tp / (i + 2)
            recall = tp / max(label.sum(), 1)
            f1 = (2 * precision * recall) / (precision + recall)
        else:
            f1 = 0

    return f1

from multiprocessing import Pool


@cython.boundscheck(False)
@cython.wraparound(False)
def f1_group(np.ndarray[long, ndim=1] label, np.ndarray[double, ndim=1] preds, np.ndarray[long, ndim=1] group):
    """ ユーザ平均F1値取得関数

    :param np.ndarray label: 0-1の正解ラベル 
    :param np.ndarray preds: 予測確率ラベル
    :param  np.ndarray group: ユーザの切れ目(lightgbmのgroup定義に準拠)
    """
    cdef int i, start, end, j, s
    cdef double score = 0.
    cdef long m = group.shape[0]
    cdef long n = preds.shape[0]
    start = 0

    p = Pool()
    list_p = []
    for i in range(m):
        end = start + group[i]
        list_p.append(p.apply_async(f1_opt, (label[start:end], preds[start:end],)))
        start = end
    scores = [a.get() for a in list_p]
    p.close()
    p.join()
    return np.mean(scores)

PythonでCSVを高速&省メモリに読みたい

今日はPython (Pandas)で高速にCSVを読むことに挑戦したいと思います。

Kaggleに参加するたびに、イライラしていたので各実装の白黒はっきりさせようと思います。

R使いが羨ましいなぁと思う第一位がCSV読込が簡単に並列出来て速いことなので、

なんとかGILのあるPythonでも高速に読み込みたいと思います。

ただ、この検証ではコーディング量が多いものは検証しません。

CSV読込は頻出するので、フットワークの軽さが重要です。(オレオレライブラリ嫌い)

Pickleは早いけど。。。

Kaggleでは、大規模なCSVを高速に読みたいということが頻発します。

シリアライズだけなら、Pickleでのread/writeが爆速なのですが、

  • バイナリなので中身が確認できない
  • 一度メモリに全展開が必要

と欠点もあるため、なるべくCSVで管理したいところです。

見てみるとこんな感じ。

Line #    Mem usage    Increment   Line Contents
================================================
     8   88.887 MiB    0.000 MiB   @profile
     9                             def main():
    10 4116.051 MiB 4027.164 MiB       df = pd.read_csv('train_data.csv')  # .values
    11 4116.051 MiB    0.000 MiB       with open('aaa', 'wb') as f:
    12 4116.051 MiB    0.000 MiB           pickle.dump(df, f, -1)
    13 4116.051 MiB    0.000 MiB       with open('aaa', 'rb') as f:
    14 4116.051 MiB    0.000 MiB           df = pickle.load(f)

pickleはもっとメモリ喰うと思っていたので、GNU版のtimeで調べると

$ /usr/bin/time -f "%M KB" python test.py
27827088 KB

約28GB… いやいやメモリ使いすぎでしょう。。。

個人的な感覚はオブジェクトサイズの2倍程度だと思ってたので驚きです。こいつはどうにかしないといけません。

皆様、memory_profilerは関数内部で消費されたメモリは表示されませんので気をつけて!

(実は最後の最後で気づいて全実験をやり直した。マジでmemory_profilerさん勘弁して下さい。)

結論はDask使おう!

今回の計測結果がこちらです。
読み込み後のオブジェクトが約2GBぐらいなのでそれを参考に見て下さい。

カッコが付いているところは計測したけど子プロセス分入ってるか自信がない箇所です。

実装 実行時間 memory_profile最大使用メモリ GNU版timeの最大使用メモリ
pandas.read_csv() 39.2s 4.0GB 6.3GB
pandas.read_csv() (dtype指定) 37.2s 2.0GB 3.2GB
pandas.read_csv() (gzip圧縮) 48.5s 4.0GB 6.3GB
numpy.genfromtxt() 4min 41s timeout 22.8 GB
pandas.read_csv() (chunksize指定 + multiprocessing) 43.6s 計測不可 (4.6GB)
pandas.read_csv() (chunksize指定) 40.6s 2.5GB 4.4GB
pandas.read_csv() (chunksize指定+GC) 40.8s 2.5GB 4.4GB
dask.dataframe.read_csv() (dask.multiprocessing.get) 16.3s 計測不可 (4.2GB)
ファイル分割(10ファイル・並列なし) 49.1s 2.3GB 4.4GB
ファイル分割(10ファイル・並列) 8.89s 計測不可 (4.2GB)
pickle 2.18s 2.1GB 28GB

計測した結果から言うと、daskを使うのが速くて実装が楽です! 、デフォルトread_csvはかなりメモリを使用します!

ファイル分割が一番効くのはそうなんですが、↑の結果は行での分割なのでKaggleとかの特徴量で管理したいときには微妙なんですよね。
daskは複数ファイル読込も対応しているので、2行で書けるdaskの記法に統一してしまっても良さげです。

16コアで2倍程度の高速化というのはちょっと物足りない気もしますが、1ファイルなのでしょうがないんですかね

列で分割した場合の結果も後述しますが、pd.mergeではcopy=Falseを指定した方が良いです。

検証環境

  • Ubuntu 16.04
  • c4.4xlarge (16 vCPU)
  • SSDストレージ
  • pandas==0.20.3
  • dask==0.15.0

当初はBash on Windows環境でやってたのですが、こちらはマルチスレッドでの挙動が怪しくてボツにしました。

データ

  • 1,000,000行, 263列
  • 2.6GB (GZIP圧縮後0.54GB)

速度検証

pandas.read_csv()

まずは王道のチェック

In [2]: %time tmp = pd.read_csv('train_data.csv')
CPU times: user 37.1 s, sys: 2.06 s, total: 39.2 s
Wall time: 39.2 s

この40秒ベンチマーク

pandas.read_csv() (dtype指定)

dtypeを指定してみた。

In [4]: %time tmp = pd.read_csv('train_data.csv', dtype=np.float32)
CPU times: user 36.7 s, sys: 564 ms, total: 37.2 s
Wall time: 37.2 s

若干速くなった。

pandas.read_csv() (gzip圧縮)

SSDのIO性能がボトルネックの可能性があるのでgzip圧縮もテスト

In [2]: %time tmp = pd.read_csv('train_data.csv.gz')
CPU times: user 47.5 s, sys: 984 ms, total: 48.5 s
Wall time: 48.5 s

gzipは速くはならないみたい

numpy.genfromtxt()

numpy.loadtxtは欠損値があると読み込めないので、こちらを検証

In [3]: %time tmp = np.genfromtxt('train_data.csv', delimiter=',', skip_header=1)
CPU times: user 4min 33s, sys: 7.98 s, total: 4min 41s
Wall time: 4min 41s

4分ぐらいかかっているので、pandasの実装の方が良いみたいですね。

pandas.read_csv() (chunksize指定 + multiprocessing)

実装

import numpy as np
import pandas as pd
from multiprocessing import Pool

def get_df(df):
    return df

p = Pool()
tmp = pd.concat(list(p.map(get_df, pd.read_csv('train_data.csv', chunksize=100000))), ignore_index=True)
p.close()
p.join()

結果

CPU times: user 39.4 s, sys: 3.9 s, total: 43.3 s
Wall time: 43.6 s

CPUもちゃんと全部動いておらず、この実装は駄目みたいですね。

やるならファイルを分割して並列読込が良さそう。

pandas.read_csv() (chunksize指定)

メモリが少ないときは使う実装なので一応確認

実装

import numpy as np
import pandas as pd
from multiprocessing import Pool

df = None

for tmp in  pd.read_csv('train_data.csv', chunksize=100000):
    if df is None:
        df = tmp
    else:
        df = df.append(tmp, ignore_index=True)

結果

In [1]: %time %run hoge.py
CPU times: user 31.7 s, sys: 8.94 s, total: 40.6 s
Wall time: 40.6 s

さほど時間が変わらず読み込めています。

pandas.read_csv() (chunksize指定 + GC)

さらにメモリに気を使ってGC入れると

import gc
import numpy as np
import pandas as pd
from multiprocessing import Pool

df = None

for tmp in  pd.read_csv('train_data.csv', chunksize=100000):
    if df is None:
        df = tmp
    else:
        df = df.append(tmp, ignore_index=True)
    del tmp
    gc.collect()
In [1]: %time %run test.py
CPU times: user 37.5 s, sys: 3.32 s, total: 40.8 s
Wall time: 40.8 s

ほぼ変わらない時間で実行出来ています。誤差レベルなのでメモリ無い時はGC入れときましょう

dask.dataframe.read_csv()

並列といえばdask! ということで試します。

import dask.dataframe as ddf
import dask.multiprocessing

df = ddf.read_csv('train_data.csv')
df = df.compute(get=dask.multiprocessing.get)
In [1]: %time %run test.py
CPU times: user 7.16 s, sys: 7.49 s, total: 14.6 s
Wall time: 16.3 s

CPUが全部動いて、倍ぐらいは速くなりました!

ただ16コアで倍なので、もうすこし贅沢を言いたい気もする

あとCHUNKSIZEを指定すると、上手く並列しなかったのでデフォルトで良さげです。

ファイル分割

100,000行ごとに10ファイルに分割して読込してみる

import glob
import numpy as np
import pandas as pd
from multiprocessing import Pool

def get_df(path):
    return pd.read_csv(path)

p = Pool()
tmp = pd.concat(p.map(get_df, glob.glob('tmp/*csv')), ignore_index=True)
p.close()
p.join()
In [1]: %time %run test.py
CPU times: user 1.73 s, sys: 1.92 s, total: 3.66 s
Wall time: 8.89 s

流石に速い。10倍ぐらい速くなってますね。

あと、本来ならコア数の倍数で分割するのが良さげです

メモリ使用量

ただ速ければいいというのは不公平なのでメモリも測ります。

特にpd.concatの瞬間に凄いメモリを使ってる可能性があります。

memory_profilerで見てみます

pandas.read_csv()

Line #    Mem usage    Increment   Line Contents
================================================
     8   88.660 MiB    0.000 MiB   @profile
     9                             def main():
    10 4115.828 MiB 4027.168 MiB       tmp = pd.read_csv('train_data.csv')

普通にread_csvすると4GBほど使用

GNU版time計測だと、6.3GB

pandas.read_csv() (dtype指定)

Line #    Mem usage    Increment   Line Contents
================================================
     8   88.680 MiB    0.000 MiB   @profile
     9                             def main():
    10 2109.586 MiB 2020.906 MiB       tmp = pd.read_csv('train_data.csv', dtype=np.float32)

64bit -> 32bitで2GBになり半分に減少

GNU版time計測だと3.2GB 同様ですね。

pandas.read_csv() (chunksize指定)

Line #    Mem usage    Increment   Line Contents
================================================
    11   88.633 MiB    0.000 MiB   @profile
    12                             def main():
    13   88.633 MiB    0.000 MiB       df = None
    14 2497.973 MiB 2409.340 MiB       for tmp in pd.read_csv('train_data.csv', chunksize=100000):
    15 2297.340 MiB -200.633 MiB           if df is None:
    16  492.543 MiB -1804.797 MiB               df = tmp
    17                                     else:
    18 2497.992 MiB 2005.449 MiB               df = df.append(tmp, ignore_index=True)

確かにchunkで読み込むとメモリ使用が減少してますね。

GNU版time計測だと4.4GB 一応素のread_csvよりは減っています。

pandas.read_csv() (chunksize指定 + GC)

Line #    Mem usage    Increment   Line Contents
================================================
    11   89.121 MiB    0.000 MiB   @profile
    12                             def main():
    13   89.121 MiB    0.000 MiB       df = None
    14 2300.328 MiB 2211.207 MiB       for tmp in pd.read_csv('train_data.csv', chunksize=100000):
    15 2300.328 MiB    0.000 MiB           if df is None:
    16  493.230 MiB -1807.098 MiB               df = tmp
    17                                     else:
    18 2500.980 MiB 2007.750 MiB               df = df.append(tmp, ignore_index=True)
    19 2300.328 MiB -200.652 MiB           del tmp
    20 2300.328 MiB    0.000 MiB           gc.collect()

GNU版time計測も4.4GBと同じ。

今回の例ではGCは余り効いてないでが、処理が長くて複雑な参照があると効くことも

ファイル分割

Line #    Mem usage    Increment   Line Contents
================================================
     8   88.434 MiB    0.000 MiB   @profile
     9                             def main():
    10 2326.051 MiB 2237.617 MiB       tmp = pd.concat(map(get_df, glob.glob('tmp/*csv')), ignore_index=True)

concatなのにメモリ使用が少ない。。。

GNU版time計測だと4.4GB

mapがイテレータになってる効果があるかもなのでlistにキャストしてみます。

Line #    Mem usage    Increment   Line Contents
================================================
    11   88.668 MiB    0.000 MiB   @profile
    12                             def main():
    13 2325.824 MiB 2237.156 MiB       tmp = list(map(get_df, glob.glob('tmp/*csv')))
    14 2325.797 MiB   -0.027 MiB       tmp = pd.concat(tmp, ignore_index=True)

GNU版time計測だと4.4GB

listにキャストしても変わらないのでイテレータで渡す意味はなさそうです。

read_csvよりも複数ファイル読込の方がメモリ使用が少ないのは意外でした。read_csvはかなりバッファを持ってメモリ確保しているっぽいですね。

複数ファイル読込は、速度に加えてメモリ側にも恩恵がありそうです。

気になったので、copy=Falseも見てみます。

Line #    Mem usage    Increment   Line Contents
================================================
     8   88.543 MiB    0.000 MiB   @profile
     9                             def main():
    10 2327.086 MiB 2238.543 MiB       tmp = pd.concat(map(get_df, glob.glob('tmp/*csv')), ignore_index=True, copy=False)

GNU版time計測も4.4GB

copy=Falseってあんま意味ないんかい。。。

列分割でのメモリ使用量

上の例では行でファイル分割しましたが、Kaggleでは特徴の出し入れを頻繁に行うので、

列分割での管理の方が理想的です。

以前こんなツイートをしたので合わせて検証します。

ファイルを前100列と残り163列に分割して検証します。

まずはpandas.merge()

Line #    Mem usage    Increment   Line Contents
================================================
     5   88.871 MiB    0.000 MiB   @profile
     6                             def main():
     7 1613.852 MiB 1524.980 MiB       df1 = pd.read_csv('train_data_first.csv')
     8 3381.113 MiB 1767.262 MiB       df2 = pd.read_csv('train_data_last.csv')
     9
    10 5387.531 MiB 2006.418 MiB       df = pd.merge(df1, df2, left_index=True, right_index=True) # パターン1
    11 5387.777 MiB    0.246 MiB       df = df1.merge(df2, left_index=True, right_index=True)) # パターン2
    12 3381.242 MiB -2006.535 MiB      df = pd.merge(df1, df2, left_index=True, right_index=True, copy=False) # パターン3

GNU版time計測は

  • パターン1: 5.5GB
  • パターン2: 5.5GB
  • パターン3: 4.7GB

pd.merge()とpd.DataFrame.merge()ではメモリ使用量に差はないようです。

mergeの場合はcopy=Falseが有効です。参照で問題ない場合は指定しましょう。

次に、numpy.concatenate

Line #    Mem usage    Increment   Line Contents
================================================
     6   88.859 MiB    0.000 MiB   @profile
     7                             def main():
     8 1613.848 MiB 1524.988 MiB       df1 = pd.read_csv('train_data_first.csv').values
     9 3381.113 MiB 1767.266 MiB       df2 = pd.read_csv('train_data_last.csv').values
    10
    11 5387.574 MiB 2006.461 MiB       df = np.concatenate([df1, df2], axis=1) # パターン1
    12 5387.645 MiB    0.070 MiB       df = np.hstack([df1, df2]) # パターン2
    13 5387.645 MiB    0.000 MiB       df = np.c_[df1, df2] # パターン3

GNU版time計測は全て同じでした。

  • パターン1: 5.5GB
  • パターン2: 5.5GB
  • パターン3: 5.5GB

copy指定しない場合は、numpy, pandasともに同じメモリ使用量でした。

私のツイートの数倍というのの再現は出来ませんでしたが、copy=Falseが良く効く場合だったのかもしれません。

KagglerのためのGit入門

お久しぶりです。絶賛ニートを楽しんでるtkm2261です。

今日はTwitterで意見が諸々散見されたので、私のKaggleでのgit活用法を共有しようと思います。

Gitの独学はハードルが高いですよね。。。

gitの運用は開発プロセスと密接なのであまり外に出てこなかったり、
実際に自分でやらないと覚えない系なので独学は結構難しいと思ってます。
(同じ理由で、テストコード系も広まらなかったり)

私も学生時代はSVNに馴染めなくて、業務でプロダクト開発して初めてバージョン管理が身についたタイプです。
最初はSourceTreeで慣れ親しんで、gitコマンドという流れを辿りました。いきなりGitコマンドのハードルの高さは理解してるつもりです。

www.sourcetreeapp.com

とはいえ、慣れるとこんなに便利なものはないので、出来る限り平易に語りたいと思います。

Kaggle利用特化(一人開発用)の最低限な要素を詰めましたので参考になれば幸いです。

変なとこあればTwitterまで

KaggleでのGitの有用性

個人的には複数環境で並行でタスクをこなすので、無いとやってられないです。

AWSのスポットインスタンスGCPマシン($300クーポンで確保)、自宅マシンの3マシンは大体いつも管理してます。

ある環境で上手く行ったものを、うまく行った箇所だけ全体に共有するのはGitが最適です。

勿論ですが作業ログが残るので結果の再現性に絶大な威力を発揮します。

フォルダ管理との長所短所

もう一つの有力な選択肢として、作業フォルダ丸ごとS3とかに定期的に上げてしまう方法が有力かと思います。

結論からいうと、たまにフォルダをs3に保存、普段はGitが最強かと思います。

長所

  • 作業が楽
  • ある地点に戻るのが楽
  • データも一緒に保存するので、再現性が高い

短所

  • サイズが大きいと時間がかかる
  • 結局しっちゃかめっちゃかになる
  • 別環境との同期がしづらい。
  • 差分管理がしづらい

Git入門したてだと『Gitで全部やるぞー』と気負いがちですが、組み合わせて一番楽な方法を取りましょう。

とりあえずこれだけ覚えておけばOK

GitやGithubは複数人開発するために様々な機能がありますが、Kaggle用に作業ログと再現性のためなら至ってシンプルに使えます。

  • 1: git add (gitにファイルを追加)
    • git add *.py *.R
    • git add -u
  • 2: git status (現在の状態確認)
  • 3: git commit -m “なにかコメント” (addしたファイルの変更確定)
  • 4: git push origin master (commitをアップロード)
  • 5: git pull (リモートの変更を手元に反映)
  • 6: git checkout (ブランチ切替、ファイル変更取消)
    • git checkout ファイル名
    • git checkout ブランチ名orハッシュ値
    • git checkout --ours ファイル名
  • 7: git reset --hard (変更全取消)

これだけで問題ないでしょう。応用編でstashとbranchは触れますが使わなくてもいけます。

各コマンドの詳細は後述

Git初心者最大の問題: どうやってある時点に戻るの?

ここでgit revert使いたい方もいるかもしれませんが、revertは色々面倒なことになるので止めましょう。

筆者推奨の戻し方は超原始的に、戻したいファイルに上書くだけです。

  • 別のフォルダに、新たにクローン
  • git checkout ハッシュ値 で戻したいところのコミットに戻る
  • このフォルダから戻したいファイルを、元のフォルダにコピー

あとはコミットコメントに戻した旨を書いておけば十分でしょう

Git初心者の気になりポイント

  • Q: プルリクとか言うもの使った方が良いんじゃないの?
    • A: 必要ありません。あれは複数人開発用&公開済みサービス用なのでKaggleでは使いません。業務でも公開前の最初期はプルリクでなくmaster pushでやる事も多いです。
  • Q: コンフリクト怖い
    • A: git checkout --ours ファイル名 を覚えるだけで大丈夫です。(後述)
  • Q: ブランチってやつ切るんでしょ?
    • A: 必要ありません。実験的なコードを残す方法として応用編で触れますが、実験的なコードはファイル名別にしてmaster pushで良いかと
  • Q: master push怖い
    • A: 自分しか使わないので存分に壊しましょう。最悪リポジトリ作り直せばいいですし
  • Q: いまどういう状態かわからなくなった
    • A: git statusでみれます。ただ初心者にはわかりづらいので慣れるまではSorceTreeを使いましょう

.gitignoreの運用

Git初心者ありがちなミスとして、巨大ファイルをgitに入れてしまうミスがあります。

これやると巨大ファイルの差分管理してしまい、元のファイル以上にgitを圧迫して、フォルダ管理より取り回しが悪いものになってしまいます

間違って入れたら、git rm --cached ファイル名 で消すしか無いですが、その前に.gitignoreを書いて未然に防ぎましょう。

data/
input/
*.csv
*.gz
*.pkl
*~
*.swp

Kagglerなら公式カーネル準拠でinput/以下にデータを入れることが多いと思いますので指定しておくと安全です。
あと私は加工データをdata/以下によく入れてるので追加してあります。

各コマンド詳細

あとは補足内容なので、分からない箇所だけ参考にしてください。

1. git add (gitにファイルを追加)

ファイルをgit管理以下に追加するコマンドです。

  • git add *.py *.R (PythonとかRのコードを追加)
  • git add -u (変更のあった管理下に入ってるファイルを追加)

これで十分かと思います。git add -Aとかは.gitignoreがしっかり指定してあればOKですが、巨大ファイルを入れる危険性があるので微妙です。

2. git status (現在の状態確認)

いまのGitの状態を確認するコマンド。全部addが終わった後や、コミット前には必ず見る習慣を付けましょう。

3. git commit -m “なにかコメント” (addしたファイルの変更確定)

コメントをつけてaddしたファイルの変更を確定します。

コメントは適当でよいですが、作業が思い出せるレベルではあったほうが良いです。

Kagglerの場合は作業でわけられない時があると思うので、日付だけとかでも良いです

理想的には毎submit後にはcommitしてLBスコアとかをコメントに入れておくと良いです

4. git push origin master (commitをアップロード)

リモートにcommit内容をアップロードします。他環境から先にpushがあるとrejectされますが、その時は先にpullします。

Kaggleの場合はアップロードためらう要素はないので、commitしたらpushする癖を付けましょう。

5. git pull (リモートの変更を手元に反映)

リモート内容を手元に反映します。ブランチを指定した方が丁寧ですが、Kaggle用途なら省略可です。

他環境で先にpushがあると失敗します。手元の変更と被らなければpullするだけで解決しますが、失敗する場合は下記で対処。

  • リモートのファイルを正としたい場合 ・・・ git checkout ファイル名 で戻してpullしましょう。手元の変更が全て要らないならgit reset --hard
  • 手元のファイルを正としたい場合 ・・・ 手元ファイルを何処かにコピーして、↑でリモートのファイルを正とした後、コピーで戻しましょう
  • 両方の変更を保持したい場合 ・・・ 対象ファイルをcommitしてpullしましょう。コンフリクトが起きる可能性がありますがgit checkout --ours ファイル名 を使いましょう。(後述)

6. git checkout (ブランチ切替、ファイル変更取消)

checkoutの用途としては、ブランチの切り替えとファイルの変更を取り消すの2つがありますが、

ここではブランチは使わないので、ファイルの変更を前のcommitに戻すコマンドとしてgit checkout ファイル名 を覚えておきましょう。

これは消してしまったファイルにも有効で、ミスってrmしてしまったらcheckoutでザオリクしましょう。

git checkout --ours ファイル名

度々登場しました、コンフリクトを解消するコマンドです。

勿論分かる人はファイルを直接編集してコンフリクト解消しても良いですが、Kagglerは手元のファイルを正としたい場合がほとんどと思います。

『--ours』の通り、手元環境を正としてコンフリクトを解決してくれます。一応、リモートを正とするgit checkout --theirsもあります。

7. git reset --hard (変更全取消)

通称バルスコマンド。手元の変更を全て破棄して直前commitの状態に戻します。

git管理下のファイルだけなので、作ったデータファイルとかは消えません。

応用編

git diff (ファイル差分確認)

前のcommitとの変更差分が見れます。ファイル名を指定するとそのファイルのdiffだけ見れます。

学習実行前のチェックに流すと良いです。

git rm --cached ファイル名 (git管理から除外)

ファイルをGit管理下から外します。--cachedが無いとファイル自体も消えるので注意

git log (コミット履歴確認)

コミット履歴を見れます。checkoutであるcommitに戻るときのハッシュ値を調べるのに重宝します。

git branch ブランチ名 (ブランチ作成)

ブランチを作ります。Kaggleではブランチで残したいケースは少ないと思いますが、Gitのメイン機能なので覚えて起きましょう。

作ったあとcheckoutで切り替えるのを忘れずに。

用途としては、

  • 実験的なコード書いて、ファイル別にするのも面倒だからブランチとして残す
  • configファイルを別にして、CPU用、並列実行用、GPU用に分ける

とか考えられます。

特に後者はdevelopブランチの変更をmaster, parallel, gpuブランチに適用みたいな、
hotfixをmasterとreleaseに適用する的な業務レベルのgit使いが要求されるのでKaggleではそこまでやらないかと

そこまで出来るgit使いはご自由どうぞ

git stash (手元変更の一時保存)

手元の変更を一時保存して、git stash popで元に戻せるコマンドです。

変更がかぶると、git stash popで戻せなくなることがあるので、これを使うならbranchを切りましょう

変更前のコードを見たいならgthubかBitbucketをブラウザから見ましょう

トラブルシューティング

何かあれば随時増やします

間違ってaddしてしまった

git reset --sorf HEAD ファイル名で外せます。

ただ新ファイルとかの場合は少し違うので、git statusすると外し方書いてあるので読みましょう。

間違ってcommitしてしまった。

git reset –soft HEAD^ で戻ります。

ファイルのadd忘れや、コメント直したいだけの場合はgit commit –amendで変更できます。

ただし、これはPush前の話なのでPushしちゃったら諦めて、直したコミットを更に上げましょう

commitログがちょっと汚くなるレベルなのでKaggleでは気にする事ないです。

gitってどのサービス使えば良いの?

サービス使わずに自分でホストしても良いですが。スポットインスタンスから見れるように

githubやBitbucketを使う方が良いでしょう。

特にBitbucketはプライベートリポジトリが無限に持てるのと、SourceTree連携があるので初心者にはBitbucketをオススメします

bitbucket.org

結局お前はどう使ってるの?

大した事はしてません。よくやるのは特徴量作成と学習は別のマシンでやって、上手くいったら各自pushで気が向いたらpullみたいな感じです。

ファイルがそもそも違うので、ほとんどコンフリクトも起こらないので単純です。

パラメタサーチの時は同じファイルいじりますが、変更が大きければcommitしてmergeしますが、コピペで片方に持ってって片方はgit resetで破棄とか普通にやります。

私は業務で慣れることが出来たので大体対処出来ますが、『何か起きたら。。。』といった不安がgit導入の最大障壁ですよね。。。

ニートになりました。

表題のとおり2017/07/01ニートとなりました。

詳細はTwitterに書いたとおりです

退職エントリが長いやつに碌なのがいないと思ってるので、特にこれ以上ありません。

勉強会とか誘って頂けると超助かります。