Kaggler Slack作りました
追記 2023/03/17
また変わっていました。
こちらが現在のkaggler-ja slack参加URLです。(2023/03/17確認)万一期限が切れていたらDMなど一報頂けると助かります。https://t.co/tylk3CY6nk https://t.co/6cFUBfDBui
— Takami Sato (@tkm2261) 2023年3月17日
追記 2022/10/23
また変わっていました。
こちらが現在のkaggler-ja slack参加URLです。(2022/08/16確認)万一期限が切れていたらDMなど一報頂けると助かります。https://t.co/soKMpRp7We https://t.co/4pAmML8id7
— Takami Sato (@tkm2261) 2022年8月15日
追記 2021/03/11
Slack側の仕様変更があり招待URLが期限切れになってました。
こちらが現在のkaggler-ja slack参加URLです。Never Expireのオプションが復活して期限切れはしなくなったと思いますが。万一期限が切れていたらDMなど一報頂けると助かります。https://t.co/ITT42hPKzV https://t.co/4pAmML8id7
— Takami Sato (@tkm2261) 2021年3月10日
現在は下記のURLから入れますが、もし万が一期限切れになっていましたらTwitterのDMでお知らせ頂けると助かります。
Kaggler-ja slack
Twitterでは拡散しましたが、検索で来る人もいそうなので宣伝
@smlyさん @threecourseさんに
— Takami Sato (@tkm2261) 2017年8月22日
日本Kagglerのslackオープンしてもらいました!
誰でもウェルカムです!ただ互助会なので業者や勧誘は蹴飛ばします!
皆でKaggleやりませうhttps://t.co/PUuyCJFk8F
https://kaggler-ja.herokuapp.com/
オープンチャンネルなのでメール入れれば誰でも入れます
誰もチェックとかしてないので気軽にどうぞー
細かすぎて伝わらないLightGBM活用法 (callback関数)
皆様tkm2261です。この頃連投が続いてますが、
最近まで参加していたInstacart Market Basket Analysis | Kaggleで色々やったので残しておこうと思います。
このcallback関数は便利ですが、Kaggleなどでヘビーに使う人以外ここまでしないと思うので活躍するかは微妙なところです。。。
LightGBMのtrain関数を読み解く
xgboostもそうですが、lightgbmにもtrain()という関数がありLightGBMユーザはこれを使って学習を実行します。
scikit-learn APIも内部ではこの関数を呼んでいるので同じです。
この引数にcallbacksというのがあり、殆どのユーザは使っていないと思います。(私も今回初)
このcallbacksの活用法が今回のトピックになります。
train関数の実装を見てみると, 192行目ぐらいでBoostingを回しています。
LightGBM/engine.py at master · Microsoft/LightGBM · GitHub
# lightgbm/engine.py: 192 for i in range_(init_iteration, init_iteration + num_boost_round): for cb in callbacks_before_iter: cb(callback.CallbackEnv(model=booster, params=params, iteration=i, begin_iteration=init_iteration, end_iteration=init_iteration + num_boost_round, evaluation_result_list=None)) booster.update(fobj=fobj) evaluation_result_list = [] # check evaluation result. if valid_sets is not None: if is_valid_contain_train: evaluation_result_list.extend(booster.eval_train(feval)) evaluation_result_list.extend(booster.eval_valid(feval)) try: for cb in callbacks_after_iter: cb(callback.CallbackEnv(model=booster, params=params, iteration=i, begin_iteration=init_iteration, end_iteration=init_iteration + num_boost_round, evaluation_result_list=evaluation_result_list)) except callback.EarlyStopException as earlyStopException: booster.best_iteration = earlyStopException.best_iteration + 1 evaluation_result_list = earlyStopException.best_score break
意外に思われるかもしれませんが、boostingのfor文はpython側で回ってます。
early stoppingもこっちで例外として書いてあるのでcallback関数さえかければループ毎にかなり動的に書くことが出来ます。
例えばearly stoppingの条件をより複雑にしたり、loggingをちゃんと仕込んで普通は標準出力に吐かれて保存が難しい学習の様子をファイルに残すことが出来ます。
train関数に渡したcallback関数はcallbacks_after_iterに渡るのでこちらをこれから見ていきます。
callback関数の仕様
custom objectiveやcustom metricと異なりcallbackの仕様は英語でもドキュメントがありません。
ただ実装は素直なので見ていきます。
文字で説明してもアレなので、先に私のコンペの実装を見せます。
def get_pred_metric(pred, dtrain): """予測値無理やり取るmetric """ return 'pred', pred, True def callback(env): """ligthgbm callback関数(instacartコンペ) :param lightgbm.callback.CallbackEnv env: 学習中のデータ """ # 10回毎に実行 if (env.iteration + 1) % 10 != 0: return clf = env.model # 学習中モデル trn_env = clf.train_set # 学習データ (ラベルだけが入ってる※後述) val_env = clf.valid_sets[0] # 検証データ (ラベルだけが入ってる※後述) # 無理やり予測値をとる preds = [ele[2] for ele in clf.eval_train(get_pred_metric) if ele[1] == 'pred'][0] # ラベル取得Cython用に型を指定 labels = trn_env.get_label().astype(np.int) # ユーザ毎のしきい値で予測を0-1にしてデータ毎に正解なら1, 不正解なら0を返す関数 # (list_idxはgroupでグローバルでアクセス) res = f1_group_idx(labels, preds, list_idx).astype(np.bool) # 間違ってるデータのウェイトを上げるて正解のデータのウェイトを下げる weight = trn_env.get_weight() if weight is None: weight = np.ones(preds.shape[0]) weight[res] *= 0.8 weight[~res] *= 1.25 trn_env.set_weight(weight) # 検証データのmetricを計算 preds = [ele[2] for ele in clf.eval_valid(get_pred_metric) if ele[1] == 'pred'][0] labels = val_env.get_label().astype(np.int) t = time.time() res = f1_group(labels, preds, list_idx) sc = np.mean(res) logger.info('cal [{}] {} {}'.format(env.iteration + 1, sc, time.time() - t))
受け取る引数はひとつ (lightgbm.callback.CallbackEnv)
ただの名前付きタプルです。lightgbmユーザなら名前で大体中身が分かるはずです。
唯一よく知らないのがmodel(Booster)かと思うので、この後見ていきます。
begin_iterationとend_iterationがあるのはinit_model指定して学習を途中からやる場合があるからです。
CallbackEnv = collections.namedtuple( "LightGBMCallbackEnv", ["model", "params", "iteration", "begin_iteration", "end_iteration", "evaluation_result_list"])
LightGBM/callback.py at master · Microsoft/LightGBM · GitHub
データのアクセスはAPI経由
callback関数最大の難関は、lightgbmの学習データをpythonから直接アクセス出来ないことです。
なぜかというと、学習はCでやるためCのオブジェクトでデータを持ってるためです。
メモリ的にPython側でも持つわけには行かないので仕方ありません。
もっと良いやり方があるかもしれませんが、学習データにはcustom metricでアクセスするのが一番楽そうでした。
なのでget_pred_metricみたいな一切計算しないで予測値を返すだけのcustom metricを実装して無理やり取得します。
preds = [ele[2] for ele in clf.eval_train(get_pred_metric) if ele[1] == 'pred'][0] preds = [ele[2] for ele in clf.eval_valid(get_pred_metric) if ele[1] == 'pred'][0]
Booster.eval_trainとBooster.eval_testで学習データと検証データのそれぞれの予測値がとれます。
データは複数渡せるので、最後に先頭のだけ取ります。
ラベルは普通にget_label()でとれます。
その他の事はBoosterの実装を見てください。
LightGBM/basic.py at master · Microsoft/LightGBM · GitHub
あとは好きに実装
私の場合は、metricの計算が遅いので10回反復毎にしたり、反復毎にデータのウェイトを変えたり、ログに書いたりといった活用をしました。
ただ反復毎にデータのウェイトを変えるのはC側の変更も必要なので注意下さい。
LightGBMで反復毎に動的にsample weight変えるの凄い面倒かった
— Takami Sato (@tkm2261) 2017年8月3日
目的関数のコンストラクタで重みを最初に受け取ってるのでAPIではどうにも出来ない
競プロでC++力高めていて助かった
自前early stoppingのやり方
↑のboosting反復の実装みれば分かる通り、lightgbm.callback.EarlyStopExceptionを上げるだけです。
callback関数内で好きに実装してraiseしましょう。
class EarlyStopException(Exception): """Exception of early stopping. Parameters ---------- best_iteration : int The best iteration stopped. """ def __init__(self, best_iteration, best_score): super(EarlyStopException, self).__init__() self.best_iteration = best_iteration self.best_score = best_score
LightGBM/callback.py at master · Microsoft/LightGBM · GitHub
ただ未検証なので、ちゃんと巻き戻るかとか検証したら教えて下さい。。。
一応書いたけど。。。
ここまで学習を制御すること無いので普通使わないですが、忘れそうなので備忘で記事にしました。
道具としてのCython
皆様tkm2261です。今日は道具としてのCythonと題して、
使うことに特化してCythonの解説をしたいと思います。
- きっかけはKaggle
- Cythonを使うとき
- FaronさんのF1最適化
- DP (Dynamic Programming)を含んだ実装はCythonの出番
- 使い方 その1 『Cython実装』
- 使い方 その2『コンパイル』
- 使い方 その3『呼び出し』
- 速度検証
- 付録: 実装 utils.pyx
きっかけはKaggle
最近まで参加していたInstacart Market Basket Analysis | Kaggleで
どうしても高速化したい処理があり実装しました。
Numbaも良いのですが、速くなるときとならないときがあり、JITよりもコンパイルが速いのは自明なのでCythonにしました。
Cythonを使うとき
Cythonはコードの可読性を下げてしまうので、下記のような事を満たすケースか考えた方が良いです。
- 処理が切り出せる
- numpy配列の処理(C言語で書けって言われたら書けるレベル)
- numpyの関数では表現できない。(配列をfor文でアクセスしないといけない)
- 呼び出し回数が数千回以上
今回のKaggleではこれが当てはまったので紹介します。
FaronさんのF1最適化
このコンペではユーザ毎にアイテムを推薦してそのF1値の平均が指標でした。
つまり、予測値に対して閾値を設けて0-1にする必要があります。
Faronさんは「Ye, N., Chai, K., Lee, W., and Chieu, H. Optimizing F-measures: A Tale of Two Approaches. In ICML, 2012.」のアルゴリズムを実装してカーネルとして公開してくれました。
F1-Score Expectation Maximization in O(n²) | Kaggle
これが絶大な威力を発揮して、コンペはこれの公開後から別の様相を呈しました。
DP (Dynamic Programming)を含んだ実装はCythonの出番
このFaronさんの実装を見てもらえばすぐわかりますが、
いつくかDPテーブルを確保して、for文を回しています。これは超Cython向きです。
使い方 その1 『Cython実装』
実装自体はこの記事最後に付けたので参照下さい。ここではPythonとの相違点をメインに話します。
ファイルは.pyx
とくに考えずに、.pyではなく.pyxとしましょう。
cimport
CythonやNumpyのCライブラリに直接アクセスするためにcimportします
cimport cython
cimport numpy as np
ライブラリやヘッダにパスを通す必要がありますが、それは後ほど
型宣言
cdefを付けて使う前に宣言してください。
cdef int i, j, k
一番上でi,j,kをintで宣言するのは競プロっぽいですね。
実際、競プロ+CythonをやればPythonコードをかなり速く出来ると思います。
配列の場合は次元数と型を宣言してください。
cdef np.ndarray[double, ndim = 2] DP_C cdef np.ndarray[long, ndim= 1] idx
np.float64で渡すと、Cython側ではlong型になるので注意しましょう。メモリにうるさい場合は呼び出しの方でも小さい整数型を使って下さい。
Cythonでは配列の次元、型指定が速度のキーとなるので必ず指定しましょう。
これやらないと使う意味ないです。
オプション指定
Pythonでは配列の長さを超えてアクセスするとエラーになったり、-1でアクセスすると一番後ろが取れたり便利な機能があります。
裏を返せば、そこのチェックに時間を使っているので、これを切ると高速になります。
@cython.boundscheck(False) @cython.wraparound(False)
ただしC言語同様にSegmentation Faultを吐くようになります。
普通のPythonも書ける
今回はユーザ分並列したかったのでmultiprocessingも呼んでいます。
付録の感じで普通のPythonコードもpyxファイル内で呼ぶことが出来ます。
ただし、高速化は余りされないので注意しましょう。
※ CythonでGIL外してマルチスレッド処理も可能ですが、超面倒なので極力避けましょう
このf1_groupはlightgbmの評価関数として使える形になっており、これを使ってコンペではチューニングしていますた。
使い方 その2『コンパイル』
ここではsetup.pyを使うコンパイルを紹介します。
pyximport使うのもありますが、呼び出し元でcythonを意識したくないのでsetup.pyで.soを作ります。
コンパイルするpyxファイルはutils.pyxとします。
setup.pyの書き方
from distutils.core import setup from distutils.extension import Extension from Cython.Distutils import build_ext import numpy setup( cmdclass={'build_ext': build_ext}, ext_modules=[Extension("utils", ["utils.pyx"])], include_dirs=[numpy.get_include()], extra_compile_args=['-fopenmp'], extra_link_args=['-fopenmp'], )
numpy.get_include()で動的に持ってくるのがミソです。
コンパイルの実行
下記を実行してすると、
python setup.py build_ext --inplace
というファイルが作成されます。これを実行したいフォルダに持っていきます。
使い方 その3『呼び出し』
作成されたsoファイルを実行したいフォルダに持ってきて普通にimportできます
from utils import f1_opt
soファイルのドット前の名前でimport出来ます。
速度検証
実際に1万人に対して並列せずに実行して速度を見てみます。
計算時間 | |
---|---|
Python実装 | 241.1秒 |
Cython実装 | 21.5秒 |
約12倍の高速化になっています。
使いすぎには注意ですが、Cythonは非常に強力なツールです。
付録: 実装 utils.pyx
スネークケースとかごっちゃになってますがコピペして持ってきたのですみません。。。
cimport cython import numpy as np cimport numpy as np from sklearn.metrics import f1_score @cython.boundscheck(False) @cython.wraparound(False) def f1_opt(np.ndarray[long, ndim=1] label, np.ndarray[double, ndim=1] preds): """ ユーザ毎の期待F1値最大化&その時の実際のF1値取得関数 :param label np.ndarray: 0-1の正解ラベル :param preds np.ndarray: 予測確率ラベル """ cdef int i, j, k, k1 cdef double f1, score, f1None, pNone cdef long n = preds.shape[0] pNone = (1 - preds).prod() cdef np.ndarray[long, ndim= 1] idx = np.argsort(preds)[::-1] label = label[idx] preds = preds[idx] cdef np.ndarray[double, ndim = 2] DP_C = np.zeros((n + 2, n + 1), dtype=np.float) DP_C[0, 0] = 1.0 for j in range(1, n): DP_C[0, j] = (1.0 - preds[j - 1]) * DP_C[0, j - 1] for i in range(1, n + 1): DP_C[i, i] = DP_C[i - 1, i - 1] * preds[i - 1] for j in range(i + 1, n + 1): DP_C[i, j] = preds[j - 1] * DP_C[i - 1, j - 1] + (1.0 - preds[j - 1]) * DP_C[i, j - 1] cdef np.ndarray[double, ndim = 1] DP_S = np.zeros((2 * n + 1,)) cdef np.ndarray[double, ndim = 1] DP_SNone = np.zeros((2 * n + 1,)) for i in range(1, 2 * n + 1): DP_S[i] = 1. / (1. * i) DP_SNone[i] = 1. / (1. * i + 1) score = -1 cdef np.ndarray[double, ndim= 1] expectations = np.zeros(n + 1) cdef np.ndarray[double, ndim= 1] expectationsNone = np.zeros(n + 1) for k in range(n + 1)[::-1]: f1 = 0 f1None = 0 for k1 in range(n + 1): f1 += 2 * k1 * DP_C[k1][k] * DP_S[k + k1] f1None += 2 * k1 * DP_C[k1][k] * DP_SNone[k + k1] for i in range(1, 2 * k - 1): DP_S[i] = (1 - preds[k - 1]) * DP_S[i] + preds[k - 1] * DP_S[i + 1] DP_SNone[i] = (1 - preds[k - 1]) * DP_SNone[i] + preds[k - 1] * DP_SNone[i + 1] expectations[k] = f1 expectationsNone[k] = f1None + 2 * pNone / (2 + k) if expectations.max() > expectationsNone.max(): i = np.argsort(expectations)[n] - 1 tp = label[:i + 1].sum() if tp > 0: precision = tp / (i + 1) recall = tp / label.sum() f1 = (2 * precision * recall) / (precision + recall) else: f1 = 0 else: i = np.argsort(expectationsNone)[n] - 1 tp = label[:i + 1].sum() if label.sum() != 0 else 1 if tp > 0: precision = tp / (i + 2) recall = tp / max(label.sum(), 1) f1 = (2 * precision * recall) / (precision + recall) else: f1 = 0 return f1 from multiprocessing import Pool @cython.boundscheck(False) @cython.wraparound(False) def f1_group(np.ndarray[long, ndim=1] label, np.ndarray[double, ndim=1] preds, np.ndarray[long, ndim=1] group): """ ユーザ平均F1値取得関数 :param np.ndarray label: 0-1の正解ラベル :param np.ndarray preds: 予測確率ラベル :param np.ndarray group: ユーザの切れ目(lightgbmのgroup定義に準拠) """ cdef int i, start, end, j, s cdef double score = 0. cdef long m = group.shape[0] cdef long n = preds.shape[0] start = 0 p = Pool() list_p = [] for i in range(m): end = start + group[i] list_p.append(p.apply_async(f1_opt, (label[start:end], preds[start:end],))) start = end scores = [a.get() for a in list_p] p.close() p.join() return np.mean(scores)
PythonでCSVを高速&省メモリに読みたい
今日はPython (Pandas)で高速にCSVを読むことに挑戦したいと思います。
Kaggleに参加するたびに、イライラしていたので各実装の白黒はっきりさせようと思います。
R使いが羨ましいなぁと思う第一位がCSV読込が簡単に並列出来て速いことなので、
なんとかGILのあるPythonでも高速に読み込みたいと思います。
ただ、この検証ではコーディング量が多いものは検証しません。
CSV読込は頻出するので、フットワークの軽さが重要です。(オレオレライブラリ嫌い)
Pickleは早いけど。。。
Kaggleでは、大規模なCSVを高速に読みたいということが頻発します。
シリアライズだけなら、Pickleでのread/writeが爆速なのですが、
- バイナリなので中身が確認できない
- 一度メモリに全展開が必要
と欠点もあるため、なるべくCSVで管理したいところです。
見てみるとこんな感じ。
Line # Mem usage Increment Line Contents ================================================ 8 88.887 MiB 0.000 MiB @profile 9 def main(): 10 4116.051 MiB 4027.164 MiB df = pd.read_csv('train_data.csv') # .values 11 4116.051 MiB 0.000 MiB with open('aaa', 'wb') as f: 12 4116.051 MiB 0.000 MiB pickle.dump(df, f, -1) 13 4116.051 MiB 0.000 MiB with open('aaa', 'rb') as f: 14 4116.051 MiB 0.000 MiB df = pickle.load(f)
pickleはもっとメモリ喰うと思っていたので、GNU版のtimeで調べると
$ /usr/bin/time -f "%M KB" python test.py 27827088 KB
約28GB… いやいやメモリ使いすぎでしょう。。。
個人的な感覚はオブジェクトサイズの2倍程度だと思ってたので驚きです。こいつはどうにかしないといけません。
皆様、memory_profilerは関数内部で消費されたメモリは表示されませんので気をつけて!
(実は最後の最後で気づいて全実験をやり直した。マジでmemory_profilerさん勘弁して下さい。)
結論はDask使おう!
今回の計測結果がこちらです。
読み込み後のオブジェクトが約2GBぐらいなのでそれを参考に見て下さい。
カッコが付いているところは計測したけど子プロセス分入ってるか自信がない箇所です。
実装 | 実行時間 | memory_profile最大使用メモリ | GNU版timeの最大使用メモリ |
---|---|---|---|
pandas.read_csv() | 39.2s | 4.0GB | 6.3GB |
pandas.read_csv() (dtype指定) | 37.2s | 2.0GB | 3.2GB |
pandas.read_csv() (gzip圧縮) | 48.5s | 4.0GB | 6.3GB |
numpy.genfromtxt() | 4min 41s | timeout | 22.8 GB |
pandas.read_csv() (chunksize指定 + multiprocessing) | 43.6s | 計測不可 | (4.6GB) |
pandas.read_csv() (chunksize指定) | 40.6s | 2.5GB | 4.4GB |
pandas.read_csv() (chunksize指定+GC) | 40.8s | 2.5GB | 4.4GB |
dask.dataframe.read_csv() (dask.multiprocessing.get) | 16.3s | 計測不可 | (4.2GB) |
ファイル分割(10ファイル・並列なし) | 49.1s | 2.3GB | 4.4GB |
ファイル分割(10ファイル・並列) | 8.89s | 計測不可 | (4.2GB) |
pickle | 2.18s | 2.1GB | 28GB |
計測した結果から言うと、daskを使うのが速くて実装が楽です! 、デフォルトread_csvはかなりメモリを使用します!
ファイル分割が一番効くのはそうなんですが、↑の結果は行での分割なのでKaggleとかの特徴量で管理したいときには微妙なんですよね。
daskは複数ファイル読込も対応しているので、2行で書けるdaskの記法に統一してしまっても良さげです。
16コアで2倍程度の高速化というのはちょっと物足りない気もしますが、1ファイルなのでしょうがないんですかね
列で分割した場合の結果も後述しますが、pd.mergeではcopy=Falseを指定した方が良いです。
検証環境
当初はBash on Windows環境でやってたのですが、こちらはマルチスレッドでの挙動が怪しくてボツにしました。
データ
- 1,000,000行, 263列
- 2.6GB (GZIP圧縮後0.54GB)
速度検証
pandas.read_csv()
まずは王道のチェック
In [2]: %time tmp = pd.read_csv('train_data.csv') CPU times: user 37.1 s, sys: 2.06 s, total: 39.2 s Wall time: 39.2 s
この40秒がベンチマーク
pandas.read_csv() (dtype指定)
dtypeを指定してみた。
In [4]: %time tmp = pd.read_csv('train_data.csv', dtype=np.float32) CPU times: user 36.7 s, sys: 564 ms, total: 37.2 s Wall time: 37.2 s
若干速くなった。
pandas.read_csv() (gzip圧縮)
SSDのIO性能がボトルネックの可能性があるのでgzip圧縮もテスト
In [2]: %time tmp = pd.read_csv('train_data.csv.gz') CPU times: user 47.5 s, sys: 984 ms, total: 48.5 s Wall time: 48.5 s
gzipは速くはならないみたい
numpy.genfromtxt()
numpy.loadtxtは欠損値があると読み込めないので、こちらを検証
In [3]: %time tmp = np.genfromtxt('train_data.csv', delimiter=',', skip_header=1) CPU times: user 4min 33s, sys: 7.98 s, total: 4min 41s Wall time: 4min 41s
4分ぐらいかかっているので、pandasの実装の方が良いみたいですね。
pandas.read_csv() (chunksize指定 + multiprocessing)
実装
import numpy as np import pandas as pd from multiprocessing import Pool def get_df(df): return df p = Pool() tmp = pd.concat(list(p.map(get_df, pd.read_csv('train_data.csv', chunksize=100000))), ignore_index=True) p.close() p.join()
結果
CPU times: user 39.4 s, sys: 3.9 s, total: 43.3 s Wall time: 43.6 s
CPUもちゃんと全部動いておらず、この実装は駄目みたいですね。
やるならファイルを分割して並列読込が良さそう。
pandas.read_csv() (chunksize指定)
メモリが少ないときは使う実装なので一応確認
実装
import numpy as np import pandas as pd from multiprocessing import Pool df = None for tmp in pd.read_csv('train_data.csv', chunksize=100000): if df is None: df = tmp else: df = df.append(tmp, ignore_index=True)
結果
In [1]: %time %run hoge.py CPU times: user 31.7 s, sys: 8.94 s, total: 40.6 s Wall time: 40.6 s
さほど時間が変わらず読み込めています。
pandas.read_csv() (chunksize指定 + GC)
さらにメモリに気を使ってGC入れると
import gc import numpy as np import pandas as pd from multiprocessing import Pool df = None for tmp in pd.read_csv('train_data.csv', chunksize=100000): if df is None: df = tmp else: df = df.append(tmp, ignore_index=True) del tmp gc.collect()
In [1]: %time %run test.py CPU times: user 37.5 s, sys: 3.32 s, total: 40.8 s Wall time: 40.8 s
ほぼ変わらない時間で実行出来ています。誤差レベルなのでメモリ無い時はGC入れときましょう
dask.dataframe.read_csv()
並列といえばdask! ということで試します。
import dask.dataframe as ddf import dask.multiprocessing df = ddf.read_csv('train_data.csv') df = df.compute(get=dask.multiprocessing.get)
In [1]: %time %run test.py CPU times: user 7.16 s, sys: 7.49 s, total: 14.6 s Wall time: 16.3 s
CPUが全部動いて、倍ぐらいは速くなりました!
ただ16コアで倍なので、もうすこし贅沢を言いたい気もする
あとCHUNKSIZEを指定すると、上手く並列しなかったのでデフォルトで良さげです。
ファイル分割
100,000行ごとに10ファイルに分割して読込してみる
import glob import numpy as np import pandas as pd from multiprocessing import Pool def get_df(path): return pd.read_csv(path) p = Pool() tmp = pd.concat(p.map(get_df, glob.glob('tmp/*csv')), ignore_index=True) p.close() p.join()
In [1]: %time %run test.py CPU times: user 1.73 s, sys: 1.92 s, total: 3.66 s Wall time: 8.89 s
流石に速い。10倍ぐらい速くなってますね。
あと、本来ならコア数の倍数で分割するのが良さげです
メモリ使用量
ただ速ければいいというのは不公平なのでメモリも測ります。
特にpd.concatの瞬間に凄いメモリを使ってる可能性があります。
memory_profilerで見てみます
pandas.read_csv()
Line # Mem usage Increment Line Contents ================================================ 8 88.660 MiB 0.000 MiB @profile 9 def main(): 10 4115.828 MiB 4027.168 MiB tmp = pd.read_csv('train_data.csv')
普通にread_csvすると4GBほど使用
GNU版time計測だと、6.3GB
pandas.read_csv() (dtype指定)
Line # Mem usage Increment Line Contents ================================================ 8 88.680 MiB 0.000 MiB @profile 9 def main(): 10 2109.586 MiB 2020.906 MiB tmp = pd.read_csv('train_data.csv', dtype=np.float32)
64bit -> 32bitで2GBになり半分に減少
GNU版time計測だと3.2GB 同様ですね。
pandas.read_csv() (chunksize指定)
Line # Mem usage Increment Line Contents ================================================ 11 88.633 MiB 0.000 MiB @profile 12 def main(): 13 88.633 MiB 0.000 MiB df = None 14 2497.973 MiB 2409.340 MiB for tmp in pd.read_csv('train_data.csv', chunksize=100000): 15 2297.340 MiB -200.633 MiB if df is None: 16 492.543 MiB -1804.797 MiB df = tmp 17 else: 18 2497.992 MiB 2005.449 MiB df = df.append(tmp, ignore_index=True)
確かにchunkで読み込むとメモリ使用が減少してますね。
GNU版time計測だと4.4GB 一応素のread_csvよりは減っています。
pandas.read_csv() (chunksize指定 + GC)
Line # Mem usage Increment Line Contents ================================================ 11 89.121 MiB 0.000 MiB @profile 12 def main(): 13 89.121 MiB 0.000 MiB df = None 14 2300.328 MiB 2211.207 MiB for tmp in pd.read_csv('train_data.csv', chunksize=100000): 15 2300.328 MiB 0.000 MiB if df is None: 16 493.230 MiB -1807.098 MiB df = tmp 17 else: 18 2500.980 MiB 2007.750 MiB df = df.append(tmp, ignore_index=True) 19 2300.328 MiB -200.652 MiB del tmp 20 2300.328 MiB 0.000 MiB gc.collect()
GNU版time計測も4.4GBと同じ。
今回の例ではGCは余り効いてないでが、処理が長くて複雑な参照があると効くことも
ファイル分割
Line # Mem usage Increment Line Contents ================================================ 8 88.434 MiB 0.000 MiB @profile 9 def main(): 10 2326.051 MiB 2237.617 MiB tmp = pd.concat(map(get_df, glob.glob('tmp/*csv')), ignore_index=True)
concatなのにメモリ使用が少ない。。。
GNU版time計測だと4.4GB
mapがイテレータになってる効果があるかもなのでlistにキャストしてみます。
Line # Mem usage Increment Line Contents ================================================ 11 88.668 MiB 0.000 MiB @profile 12 def main(): 13 2325.824 MiB 2237.156 MiB tmp = list(map(get_df, glob.glob('tmp/*csv'))) 14 2325.797 MiB -0.027 MiB tmp = pd.concat(tmp, ignore_index=True)
GNU版time計測だと4.4GB
listにキャストしても変わらないのでイテレータで渡す意味はなさそうです。
read_csvよりも複数ファイル読込の方がメモリ使用が少ないのは意外でした。read_csvはかなりバッファを持ってメモリ確保しているっぽいですね。
複数ファイル読込は、速度に加えてメモリ側にも恩恵がありそうです。
気になったので、copy=Falseも見てみます。
Line # Mem usage Increment Line Contents ================================================ 8 88.543 MiB 0.000 MiB @profile 9 def main(): 10 2327.086 MiB 2238.543 MiB tmp = pd.concat(map(get_df, glob.glob('tmp/*csv')), ignore_index=True, copy=False)
GNU版time計測も4.4GB
copy=Falseってあんま意味ないんかい。。。
列分割でのメモリ使用量
上の例では行でファイル分割しましたが、Kaggleでは特徴の出し入れを頻繁に行うので、
列分割での管理の方が理想的です。
以前こんなツイートをしたので合わせて検証します。
numpy.concatinateよりもpandas.DataFrame.mergeでくっつけたほうが数倍メモリ使用が抑えられるという知見が得られた
— Takami Sato (@tkm2261) 2017年7月10日
ファイルを前100列と残り163列に分割して検証します。
まずはpandas.merge()
Line # Mem usage Increment Line Contents ================================================ 5 88.871 MiB 0.000 MiB @profile 6 def main(): 7 1613.852 MiB 1524.980 MiB df1 = pd.read_csv('train_data_first.csv') 8 3381.113 MiB 1767.262 MiB df2 = pd.read_csv('train_data_last.csv') 9 10 5387.531 MiB 2006.418 MiB df = pd.merge(df1, df2, left_index=True, right_index=True) # パターン1 11 5387.777 MiB 0.246 MiB df = df1.merge(df2, left_index=True, right_index=True)) # パターン2 12 3381.242 MiB -2006.535 MiB df = pd.merge(df1, df2, left_index=True, right_index=True, copy=False) # パターン3
GNU版time計測は
- パターン1: 5.5GB
- パターン2: 5.5GB
- パターン3: 4.7GB
pd.merge()とpd.DataFrame.merge()ではメモリ使用量に差はないようです。
mergeの場合はcopy=Falseが有効です。参照で問題ない場合は指定しましょう。
次に、numpy.concatenate
Line # Mem usage Increment Line Contents ================================================ 6 88.859 MiB 0.000 MiB @profile 7 def main(): 8 1613.848 MiB 1524.988 MiB df1 = pd.read_csv('train_data_first.csv').values 9 3381.113 MiB 1767.266 MiB df2 = pd.read_csv('train_data_last.csv').values 10 11 5387.574 MiB 2006.461 MiB df = np.concatenate([df1, df2], axis=1) # パターン1 12 5387.645 MiB 0.070 MiB df = np.hstack([df1, df2]) # パターン2 13 5387.645 MiB 0.000 MiB df = np.c_[df1, df2] # パターン3
GNU版time計測は全て同じでした。
- パターン1: 5.5GB
- パターン2: 5.5GB
- パターン3: 5.5GB
copy指定しない場合は、numpy, pandasともに同じメモリ使用量でした。
私のツイートの数倍というのの再現は出来ませんでしたが、copy=Falseが良く効く場合だったのかもしれません。
KagglerのためのGit入門
お久しぶりです。絶賛ニートを楽しんでるtkm2261です。
今日はTwitterで意見が諸々散見されたので、私のKaggleでのgit活用法を共有しようと思います。
Gitの独学はハードルが高いですよね。。。
gitの運用は開発プロセスと密接なのであまり外に出てこなかったり、
実際に自分でやらないと覚えない系なので独学は結構難しいと思ってます。
(同じ理由で、テストコード系も広まらなかったり)
私も学生時代はSVNに馴染めなくて、業務でプロダクト開発して初めてバージョン管理が身についたタイプです。
最初はSourceTreeで慣れ親しんで、gitコマンドという流れを辿りました。いきなりGitコマンドのハードルの高さは理解してるつもりです。
とはいえ、慣れるとこんなに便利なものはないので、出来る限り平易に語りたいと思います。
Kaggle利用特化(一人開発用)の最低限な要素を詰めましたので参考になれば幸いです。
変なとこあればTwitterまで
KaggleでのGitの有用性
個人的には複数環境で並行でタスクをこなすので、無いとやってられないです。
AWSのスポットインスタンス、GCPマシン($300クーポンで確保)、自宅マシンの3マシンは大体いつも管理してます。
ある環境で上手く行ったものを、うまく行った箇所だけ全体に共有するのはGitが最適です。
勿論ですが作業ログが残るので結果の再現性に絶大な威力を発揮します。
フォルダ管理との長所短所
もう一つの有力な選択肢として、作業フォルダ丸ごとS3とかに定期的に上げてしまう方法が有力かと思います。
結論からいうと、たまにフォルダをs3に保存、普段はGitが最強かと思います。
長所
- 作業が楽
- ある地点に戻るのが楽
- データも一緒に保存するので、再現性が高い
短所
- サイズが大きいと時間がかかる
- 結局しっちゃかめっちゃかになる
- 別環境との同期がしづらい。
- 差分管理がしづらい
Git入門したてだと『Gitで全部やるぞー』と気負いがちですが、組み合わせて一番楽な方法を取りましょう。
とりあえずこれだけ覚えておけばOK
GitやGithubは複数人開発するために様々な機能がありますが、Kaggle用に作業ログと再現性のためなら至ってシンプルに使えます。
- 1: git add (gitにファイルを追加)
- git add *.py *.R
- git add -u
- 2: git status (現在の状態確認)
- 3: git commit -m “なにかコメント” (addしたファイルの変更確定)
- 4: git push origin master (commitをアップロード)
- 5: git pull (リモートの変更を手元に反映)
- 6: git checkout (ブランチ切替、ファイル変更取消)
- git checkout ファイル名
- git checkout ブランチ名orハッシュ値
- git checkout --ours ファイル名
- 7: git reset --hard (変更全取消)
これだけで問題ないでしょう。応用編でstashとbranchは触れますが使わなくてもいけます。
各コマンドの詳細は後述
Git初心者最大の問題: どうやってある時点に戻るの?
ここでgit revert使いたい方もいるかもしれませんが、revertは色々面倒なことになるので止めましょう。
筆者推奨の戻し方は超原始的に、戻したいファイルに上書くだけです。
- 別のフォルダに、新たにクローン
- git checkout ハッシュ値 で戻したいところのコミットに戻る
- このフォルダから戻したいファイルを、元のフォルダにコピー
あとはコミットコメントに戻した旨を書いておけば十分でしょう
Git初心者の気になりポイント
- Q: プルリクとか言うもの使った方が良いんじゃないの?
- A: 必要ありません。あれは複数人開発用&公開済みサービス用なのでKaggleでは使いません。業務でも公開前の最初期はプルリクでなくmaster pushでやる事も多いです。
- Q: コンフリクト怖い
- A: git checkout --ours ファイル名 を覚えるだけで大丈夫です。(後述)
- Q: ブランチってやつ切るんでしょ?
- A: 必要ありません。実験的なコードを残す方法として応用編で触れますが、実験的なコードはファイル名別にしてmaster pushで良いかと
- Q: master push怖い
- A: 自分しか使わないので存分に壊しましょう。最悪リポジトリ作り直せばいいですし
- Q: いまどういう状態かわからなくなった
- A: git statusでみれます。ただ初心者にはわかりづらいので慣れるまではSorceTreeを使いましょう
.gitignoreの運用
Git初心者ありがちなミスとして、巨大ファイルをgitに入れてしまうミスがあります。
これやると巨大ファイルの差分管理してしまい、元のファイル以上にgitを圧迫して、フォルダ管理より取り回しが悪いものになってしまいます
間違って入れたら、git rm --cached ファイル名 で消すしか無いですが、その前に.gitignoreを書いて未然に防ぎましょう。
data/ input/ *.csv *.gz *.pkl *~ *.swp
Kagglerなら公式カーネル準拠でinput/以下にデータを入れることが多いと思いますので指定しておくと安全です。
あと私は加工データをdata/以下によく入れてるので追加してあります。
各コマンド詳細
あとは補足内容なので、分からない箇所だけ参考にしてください。
1. git add (gitにファイルを追加)
ファイルをgit管理以下に追加するコマンドです。
- git add *.py *.R (PythonとかRのコードを追加)
- git add -u (変更のあった管理下に入ってるファイルを追加)
これで十分かと思います。git add -Aとかは.gitignoreがしっかり指定してあればOKですが、巨大ファイルを入れる危険性があるので微妙です。
2. git status (現在の状態確認)
いまのGitの状態を確認するコマンド。全部addが終わった後や、コミット前には必ず見る習慣を付けましょう。
3. git commit -m “なにかコメント” (addしたファイルの変更確定)
コメントをつけてaddしたファイルの変更を確定します。
コメントは適当でよいですが、作業が思い出せるレベルではあったほうが良いです。
Kagglerの場合は作業でわけられない時があると思うので、日付だけとかでも良いです
理想的には毎submit後にはcommitしてLBスコアとかをコメントに入れておくと良いです
4. git push origin master (commitをアップロード)
リモートにcommit内容をアップロードします。他環境から先にpushがあるとrejectされますが、その時は先にpullします。
Kaggleの場合はアップロードためらう要素はないので、commitしたらpushする癖を付けましょう。
5. git pull (リモートの変更を手元に反映)
リモート内容を手元に反映します。ブランチを指定した方が丁寧ですが、Kaggle用途なら省略可です。
他環境で先にpushがあると失敗します。手元の変更と被らなければpullするだけで解決しますが、失敗する場合は下記で対処。
- リモートのファイルを正としたい場合 ・・・ git checkout ファイル名 で戻してpullしましょう。手元の変更が全て要らないならgit reset --hard
- 手元のファイルを正としたい場合 ・・・ 手元ファイルを何処かにコピーして、↑でリモートのファイルを正とした後、コピーで戻しましょう
- 両方の変更を保持したい場合 ・・・ 対象ファイルをcommitしてpullしましょう。コンフリクトが起きる可能性がありますがgit checkout --ours ファイル名 を使いましょう。(後述)
6. git checkout (ブランチ切替、ファイル変更取消)
checkoutの用途としては、ブランチの切り替えとファイルの変更を取り消すの2つがありますが、
ここではブランチは使わないので、ファイルの変更を前のcommitに戻すコマンドとしてgit checkout ファイル名 を覚えておきましょう。
これは消してしまったファイルにも有効で、ミスってrmしてしまったらcheckoutでザオリクしましょう。
git checkout --ours ファイル名
度々登場しました、コンフリクトを解消するコマンドです。
勿論分かる人はファイルを直接編集してコンフリクト解消しても良いですが、Kagglerは手元のファイルを正としたい場合がほとんどと思います。
『--ours』の通り、手元環境を正としてコンフリクトを解決してくれます。一応、リモートを正とするgit checkout --theirsもあります。
7. git reset --hard (変更全取消)
通称バルスコマンド。手元の変更を全て破棄して直前commitの状態に戻します。
git管理下のファイルだけなので、作ったデータファイルとかは消えません。
応用編
git diff (ファイル差分確認)
前のcommitとの変更差分が見れます。ファイル名を指定するとそのファイルのdiffだけ見れます。
学習実行前のチェックに流すと良いです。
git rm --cached ファイル名 (git管理から除外)
ファイルをGit管理下から外します。--cachedが無いとファイル自体も消えるので注意
git log (コミット履歴確認)
コミット履歴を見れます。checkoutであるcommitに戻るときのハッシュ値を調べるのに重宝します。
git branch ブランチ名 (ブランチ作成)
ブランチを作ります。Kaggleではブランチで残したいケースは少ないと思いますが、Gitのメイン機能なので覚えて起きましょう。
作ったあとcheckoutで切り替えるのを忘れずに。
用途としては、
- 実験的なコード書いて、ファイル別にするのも面倒だからブランチとして残す
- configファイルを別にして、CPU用、並列実行用、GPU用に分ける
とか考えられます。
特に後者はdevelopブランチの変更をmaster, parallel, gpuブランチに適用みたいな、
hotfixをmasterとreleaseに適用する的な業務レベルのgit使いが要求されるのでKaggleではそこまでやらないかと
そこまで出来るgit使いはご自由どうぞ
git stash (手元変更の一時保存)
手元の変更を一時保存して、git stash popで元に戻せるコマンドです。
変更がかぶると、git stash popで戻せなくなることがあるので、これを使うならbranchを切りましょう
変更前のコードを見たいならgthubかBitbucketをブラウザから見ましょう
トラブルシューティング
何かあれば随時増やします
間違ってaddしてしまった
git reset --sorf HEAD ファイル名で外せます。
ただ新ファイルとかの場合は少し違うので、git statusすると外し方書いてあるので読みましょう。
間違ってcommitしてしまった。
git reset –soft HEAD^ で戻ります。
ファイルのadd忘れや、コメント直したいだけの場合はgit commit –amendで変更できます。
ただし、これはPush前の話なのでPushしちゃったら諦めて、直したコミットを更に上げましょう
commitログがちょっと汚くなるレベルなのでKaggleでは気にする事ないです。
gitってどのサービス使えば良いの?
サービス使わずに自分でホストしても良いですが。スポットインスタンスから見れるように
githubやBitbucketを使う方が良いでしょう。
特にBitbucketはプライベートリポジトリが無限に持てるのと、SourceTree連携があるので初心者にはBitbucketをオススメします
結局お前はどう使ってるの?
大した事はしてません。よくやるのは特徴量作成と学習は別のマシンでやって、上手くいったら各自pushで気が向いたらpullみたいな感じです。
ファイルがそもそも違うので、ほとんどコンフリクトも起こらないので単純です。
パラメタサーチの時は同じファイルいじりますが、変更が大きければcommitしてmergeしますが、コピペで片方に持ってって片方はgit resetで破棄とか普通にやります。
私は業務で慣れることが出来たので大体対処出来ますが、『何か起きたら。。。』といった不安がgit導入の最大障壁ですよね。。。
ニートになりました。
表題のとおり2017/07/01ニートとなりました。
詳細はTwitterに書いたとおりです
本日の出社を最後に、会社を退職しニートとなりました
— Takami Sato (@tkm2261) 2017年6月30日
就職予定はなく仕事も受けません
しばらくは、英語・Kaggle・競プロ・筋トレ・最適化勉強の
ガチ勢として生きていきます
退職エントリが長いやつに碌なのがいないと思ってるので、特にこれ以上ありません。
勉強会とか誘って頂けると超助かります。