2015/04/13

psycopg2 でリアルタイムに PostgreSQL のメッセージ取得

[Python][PostgreSQL]
先週、Python と PostgreSQL でポータブルな Web サーバ & クライアントを作った際、↓ の課題が残っていました。RAISE INFO or NOTICE で PostgreSQL からメッセージを出しても、それを受け取るのはリアルタイムでなく、クエリ終了後にまとめてになってしまうという。何か方法はないかな〜と調べたら、ちやんと psycopg2 に非同期(asynchronous)モードがあってあっさり出来ました。


↓ ドキュメントの該当箇所と、日本語での psycopg2 非同期モード使用例です。

■ Psycopg 2.6 documentation - More advanced topics
■ [Python] greenletを使う場合にはPostgreSQLを使う方がいいという話

今日はスクリプトでのひな形をメモ。Web サーバ & クライアントに載せるのは次の機会にします。非同期でない普通のと合わせて ↓ のテスト用ファイル群を作りました。まとめた ZIP はこちら。実行環境はこれまでと同様 Windows 7 32bit + QGIS ポータブル版にあった Python 2.7 です。


↓ 二つのバッチファイルは同じ中身で、ファイル名によって起動するスクリプトを振り分けます。同期 or 非同期の二つのテスト用ボタンを作ったのと同じ。

■ psycopg2_not_async_test.py.bat
■ psycopg2_async_test.py.bat
@echo off
cd "../QGIS/bin/"
set PYTHONHOME=../apps/Python27
set PYTHONPATH=../apps/Python27/Lib
python "../../python_script/%~n0"
pause > nul


↓ 二つのスクリプトから、この SQL ファイルを読んで実行します。pg_sleep の部分が、実際は時間のかかる処理の想定。節目節目で「ここまで終わったよ」と知らせてもらいます。その際、時刻を now() で取るとすべて同じ(トランザクション開始時刻)になるので、clock_timestamp() を使用。

■ sample_raise_info.sql
-- encoding: UTF-8

DO $$
BEGIN
RAISE INFO '% Started', clock_timestamp() :: time ;

PERFORM pg_sleep(1) ; -- // 時間かかる処理

RAISE INFO '% Completed 1/3', clock_timestamp() :: time ;

PERFORM pg_sleep(1) ; -- // 時間かかる処理

RAISE INFO '% Completed 2/3', clock_timestamp() :: time ;

PERFORM pg_sleep(1) ; -- // 時間かかる処理

RAISE INFO '% Completed 3/3', clock_timestamp() :: time ;
END; $$ ;


↓ 最初に、普通の(非同期でない)スクリプトと実行結果。画面だけ見ると一秒おきに PostgreSQL からメッセージが出たみたいですが、実際はクエリ終了まで真っ黒で、最後にまとめて表示されます。クエリ開始〜終了の間、Python 側では何もできない状態。

■ psycopg2_not_async_test.py
# coding: utf-8

import os
import psycopg2
import sys

fname_sql = '/sample_raise_info.sql'
fpath_sql = os.path.abspath(os.path.dirname(__file__)) + fname_sql
with open(fpath_sql) as f:
sql = f.read()

conn = psycopg2.connect(user='postgres') # // 接続パラメータは適宜設定
curs = conn.cursor()
curs.execute(sql) # // ここの開始〜終了まで、何もできない
curs.close()
conn.close()

sys.stdout.write(''.join(conn.notices))


↓ 本題の非同期モード使用例。関数 wait は先ほどのドキュメント More advanced topics そのままに、一行だけ別関数 user_action に飛ばす行を挿入しました。user_action 内で PostgreSQL からのメッセージを表示。その際、実際の時刻を比較用に付けています。

■ psycopg2_async_test.py
# coding: utf-8

import os
import psycopg2
import psycopg2.extensions # // 追加
import select # // 追加
import sys
import time

def user_action(conn):
mes = conn.notices
if len(mes) == 0:
return
chr = mes[-1] # // 追加されたメッセージだけ取得
sys.stdout.write(str(time.strftime('%H:%M:%S')) + ', ' + chr)

def wait(conn):
while 1:
state = conn.poll()
user_action(conn) # // クエリ実行中の処理を挿入
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)

if __name__ == '__main__':

fname_sql = '/sample_raise_info.sql'
fpath_sql = os.path.abspath(os.path.dirname(__file__)) + fname_sql
with open(fpath_sql) as f:
sql = f.read()

aconn = psycopg2.connect(user='postgres', async=1) # // 接続パラメータは適宜設定
wait(aconn)
acurs = aconn.cursor()
acurs.execute(sql) # // ここの開始〜終了の間、次行が実行される
wait(acurs.connection)
acurs.close()
aconn.close()


上の結果で、左側の時刻が Python で画面出力した時、INFO: の右が PostgreSQL 側でメッセージを発した時刻。画像では伝わりづらいですが、期待どおり一秒おきにメッセージ表示されました。(動画にしたいと思いつつ、キャプチャするのが面倒で ^^;)

PostgreSQL からのメッセージは、connection クラスの notices リストに順次追加されていきます。上の user_action 関数では、そのリストの末端要素だけを取り出しましたが、そうしないでリスト全体を常に表示すると ↓ こんな感じ。


ただ、何かのタイミングで最後に二回 user_action が呼ばれる ↓ 時がありました。対策がないか、今後調べます。弥縫的には、notices の要素数を保存して、要素数の変化をチェックするとか…。


実用上は、コンソールでするだけなら psql で済みます。psycopg2 での方法を調べたのは、先週のように Web クライアントで同じことができると面白そうだと思ったから。進展したら別途記事にします。
×

この広告は1年以上新しい記事の投稿がないブログに表示されております。