まちBBS の スレッドをElasticsearch に突っ込んでみる

今日はスレッドリストができたのでスレッドを取得してみようと思います。
まちBBS の スレッドリストをElasticsearch に突っ込んでみる - moremagicの日記 で作成した
出力物 subback.json からスレッドのURLを取得して順次DL、JSON化していくスクリプトです。

Python 2.7.9 で作っています。
もちろん JSON の構造は適当です。

thread-get.py

#!/usr/bin/python
# coding: UTF-8

import contextlib
import urllib2
import json
import os, sys
import time

reload(sys)
sys.setdefaultencoding('utf-8')

def getThread(url):
    ret = ''
    with contextlib.closing(urllib2.urlopen(url)) as resp:
        html = resp.read()

    for line in html.split('\n'):
        if line.strip().startswith('<dt>') & line.strip().endswith('<br>'):
            try:
                idx = line[line.index('<dt>')+4:line.index(' ')]
                name = line[line.index('<b> ')+4:line.index(' ', line.index('<b> ')+4)]
                
                datebuf = line[line.index(' 投稿日'.encode('shift_jis'))+10:line.index('<font ', line.index(' 投稿日'.encode('shift_jis')))].split(' ID:')
                date = datebuf[0]
                ID = ''
                if (len(datebuf) >= 2):
                    ID = datebuf[1]
                provider = line[line.index('<font size=1>[ ')+15:line.index(']', line.index('<font size=1>[ '))].strip()
                body = line[line.index('<dd>'):]
                mail = ''
                if(line.find('href="mailto:') != -1):
                    mail = line[line.rindex('href="mailto:')+6:line.index('"', line.index('href="mailto:')+13)]

                ret += '{"index": {} }\n{ "idx": ' + idx + ', "mail":"' + jsonEscape(conv_encoding(mail)[0]) + '", "name":"' + jsonEscape(conv_encoding(name)[0]) + '", "date":"' + conv_encoding(date)[0] + '", "id":"' + conv_encoding(ID)[0] + '", "provider":"' + conv_encoding(provider)[0] + '", "URL":"' + url[url.rindex('/')+1:] + '", "body":"' + jsonEscape(conv_encoding(body)[0]) + '"}\n'
            except Exception as err:
                print err
                print 'err:'+ url
                print line
                sys.exit(-1)
                

    return ret

def jsonEscape(s):
    return s.replace('\\', '\\\\').replace('"', '\\"').replace('/', '\\/').replace('\b', '\\b').replace('\f', '\\f').replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t')

# 参考; http://qiita.com/zarchis/items/3258562ebc9570fa05a3
def conv_encoding(data):
    lookup = ('utf_8', 'euc_jp', 'euc_jis_2004', 'euc_jisx0213',
            'shift_jis', 'shift_jis_2004','shift_jisx0213',
            'iso2022jp', 'iso2022_jp_1', 'iso2022_jp_2', 'iso2022_jp_3',
            'iso2022_jp_ext','latin_1', 'ascii')
    encode = None
    for encoding in lookup:
      try:
        data = data.decode(encoding)
        encode = encoding
        break
      except:
        pass
    if isinstance(data, unicode):
        return data,encode
    else:
        raise LookupErrorconv_encoding


def getLinkUrl(fPath):
    ret = []
    with open(fPath, 'r') as f:
        for line in f.readlines():
            try:
                jsonData = json.loads(line)
                if 'link' in jsonData:
                    ret.append( jsonData['link'] )
                else:
                    pass
            except:
                pass;
    return ret

def getThreadJson(url):
    #url = 'http://machi.to/bbs/read.cgi/kana/1383474622'
    enc_ret = ''
    for l in getThread(url).split('\n'):
        #print conv_encoding(l)[1]
        enc_ret += l.decode(conv_encoding(l)[1]).encode('utf-8') + '\n'
    return enc_ret


cnt = 0
urllist = getLinkUrl('subback.json')
#urllist = ['http://machi.to/bbs/read.cgi/hokkaidou/1422793471/l50']
for url in urllist:
    url = url[:url.rindex('/')]
    cnt = cnt+1
    print url + '  ' + str(cnt) + '/' + str(len(urllist))
    with open(url[len('http://machi.to/bbs/read.cgi/'):].replace('/', '-') + '.dat', 'w') as f:
        f.write(getThreadJson(url))
        time.sleep(0.3)

おもむろに実行!
7h 程度回してみましたが まったく終わらないので中断・・・orz

# python thread-get.py

※ 中断

※ それでもこれだけのスレッドのデータが取れました
# ll *.dat | wc -l
10219


早速、出来上がったデータを
昨日作った Elasticsearch に突っ込んでみます。

全データだとHDD的にもPC的にもつらそうなので 神奈川県のスレッドのみを突っ込んでみます。
例のごとく Type とか適当に突っ込んでます。

# curl -X POST http://192.168.1.6:9201/machibbs/thread/_bulk --data-binary @- `cat kana*.dat`
-bash: fork: Cannot allocate memory

メモリが足りないと。うーん。catは辛いか。。。
しょうがないので find で順次突っ込むことにします。

# find . -name "kana*.dat" -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/thread/_bulk --data-binary @\{\} \;

※ なんかいっぱい出てきてうまくいった


確認するとこうなってる。
なるほど。こうやればいいのね。


■まとめて流し込む

というわけで今までの作業を一気に行い、Elasticsearchに突っ込む方法をめも。
いろいろ弄った結果 すべてのJSONデータを取るのに 5h 程度
全部で 3G 程度の大きさになりました。

python ./subback-get.py > subback.json && python ./thread-get.py 
find . -name "hokkaidou*.dat" -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/hokkaidou/_bulk --data-binary @\{\} \;
find . -name "tohoku*.dat"    -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/tohoku/_bulk    --data-binary @\{\} \;
find . -name "kousinetu*.dat" -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/kousinetu/_bulk --data-binary @\{\} \;
find . -name "kanto*.dat"     -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/kanto/_bulk     --data-binary @\{\} \;
find . -name "tokyo*.dat"     -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/tokyo/_bulk     --data-binary @\{\} \;
find . -name "tama*.dat"      -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/tama/_bulk      --data-binary @\{\} \;
find . -name "kana*.dat"      -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/kana/_bulk      --data-binary @\{\} \;
find . -name "tokai*.dat"     -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/tokai/_bulk     --data-binary @\{\} \;
find . -name "kinki*.dat"     -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/kinki/_bulk     --data-binary @\{\} \;
find . -name "osaka*.dat"     -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/osaka/_bulk     --data-binary @\{\} \;
find . -name "cyugoku*.dat"   -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/cyugoku/_bulk   --data-binary @\{\} \;
find . -name "sikoku*.dat"    -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/sikoku/_bulk    --data-binary @\{\} \;
find . -name "kyusyu*.dat"    -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/kyusyu/_bulk    --data-binary @\{\} \;
find . -name "okinawa*.dat"   -type f -exec curl -X POST http://192.168.1.6:9201/machibbs/okinawa/_bulk   --data-binary @\{\} \;

Elasticsearchでクラスタ構成をとっているとデータの流し込みに時間がかかる見たい。
何時間かかっても終わらない。

というわけで
一台構成にして一気に流し込んでみたら なんかエラーになった。。。

HDD使用量 も 45% 程度だから HDDがなくなったわけではなさそう。
Elasticsearch も生きている。

データが多すぎると駄目なの・・・?
うぅーーん・・よくわかんない。