2015/04/18

pywebsocket & PostgreSQL の Web クライアント(暫定)

[WebSocket][Python][PostgreSQL]
下記三記事の合体版です。実行環境はこれまでと同じ Windows 7 32bit + Python 2.7 + PostgreSQL Portable 9.4 + Firefox Portable 36。

■ ポータブルな Firefox & Python で WebSocket 通信テスト(2015/04/16)
■ pywebsocket でブラウザ ⇒ サーバに処理中止を指示(暫定)(2015/04/17)
■ psycopg2 でリアルタイムに PostgreSQL のメッセージ取得(2015/04/13)

処理中止の部分は multiprocessing のプロセス間通信を使えばできるかもしれないと、後から気付いたので、今日のは暫定版です。下の動画では

・ 普通に SELECT 文を実行
・ SELECT の前に DO 文で一秒ずつ時間を表示(重い処理の進行状況の代わり)
・ 上の処理を、途中で中止
・ DO の時間間隔を 0.5 秒と 0.1 秒に変えて実行

を試しています。

■ 20150418_demo.mp4


余談ですが PostgreSQL で floor で整数に切り下げても、型が整数でないと Python で文字化した時に自動的に .0 が付くようです。

ソース一式は 20150418_demo.zip で、ポータブル版 QGIS の下に置いた ↓ フォルダのうち、mod_pywebsocket パッケージ(公式サイトはこちら)以外が入っています。


pywebsocket のスタンドアロンサーバを起動する ctr_pywebsocket.py と test_pywebsocket.bat は 2015/04/16 と同じで、バッチファイル実行一つでサーバを起動・再起動し、コマンドプロンプトを最小化します。表示させると ↓ こんな感じ。


クライアント側は index.html, main.css, main.js の三つで構成。JavaScript でサーバと通信 & 結果表示をします。結果はまず一つの DIV タグに入れ、テーブルがあれば位置を絶対指定で移動。

■ index.html
<html><head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>Portable pywebsocket + psycopg2 + PostgreSQL : Test 1</title>
<link rel="stylesheet" type="text/css" href="main.css" />
<script type="text/javascript" src="main.js"></script>
</head>
<body>
<div id="title"></div>
<div>
Connection :<input id="con" type="text" /><br />
<textarea id="sql"></textarea><br />
<input type="button" id="button1" />
</div>
<div id="res"></div>
</body>
</html>

■ main.css
@charset "utf-8";

body {
font-family: monospace
; font-size: 12px
; padding: 10px
}

#title {
margin: 0 0 15px
}

#con {
font-size: 11px
; margin-left: 8px
; padding: 2px 4px
}

#sql {
font-size: 11px
; line-height: 1.4em
; margin: 5px 0 0 0
; padding: 5px
; resize: vertical
}

#button1 {
margin: 5px 0 0 0
}

#res {
font-size: 11px
; line-height: 1.4em
; padding: 0 0 0 10px
}

table {
border-collapse: collapse
; border-top: 1px solid black
; border-left: 1px solid black
; left: 360px
; position: absolute
; top: 20px
; margin: 0 0 20px
}

th {
background: wheat
}

th, td {
border-bottom: 1px solid black
; border-right: 1px solid black
; font-family:monospace
; font-size:12px
; padding:2px 4px
}

■ main.js
// coding: utf-8

function $(x) { return document.getElementById(x) }
function $tag(x) { return document.getElementsByTagName(x) }

window.addEventListener('load', function() {
$('title').textContent = document.title;

var width_1 = 80;
var width_2 = 240;
$('con').style.width = width_2 + 'px';
$('button1').style.marginLeft = (width_2 + 1) + 'px';
$('button1').style.width = width_1 + 'px';
$('sql').style.height = '180px';
$('sql').style.width = (width_1 + width_2) + 'px';
$('sql').focus();
disp_cancel(1);
});

function add_res(str, clear) {
var r = $('res');
if (clear) {
r.innerHTML = str;
} else {
r.innerHTML += str;
}
if (str.indexOf('<table>') == -1) {
r.innerHTML += '<br />';
}
}

function disp_cancel(x) {
var b = $('button1');
if (x > 0) {
b.value = '送信';
b.style.color = 'black';
b.onclick = action;
} else {
b.value = '中止';
b.style.color = 'red';
b.onclick = cancel;
}
}

function action(){
var con = $('con').value;
var sql = $('sql').value;
var dat = 'con=' + con + '&sql=' + encodeURIComponent(sql);

var res = $('res');
res.cancelkey = '';
res.url = 'ws://localhost:8180/test';
var socket = new WebSocket(res.url);

socket.onopen = function() {
socket.send(dat);
add_res('サーバから受信開始...', 1);
}
socket.onmessage = function(e) {
if (e.data.indexOf('cancelkey=') == 0) {
res.cancelkey = e.data
disp_cancel(0);
return
}
add_res(e.data);
}
socket.onerror = function() {
add_res('エラー発生');
disp_cancel(1);
socket = undefined;
}
socket.onclose = function() {
add_res('サーバからの受信終了');
disp_cancel(1);
socket = undefined;
}
}

function cancel(){
var res = $('res');
if (res.cancelkey == '') {
alert('no cancelkey');
return
}
var socket = new WebSocket(res.url);
socket.onopen = function() {
socket.send(res.cancelkey);
disp_cancel(1);
}
socket.onmessage = function(e) {
add_res(e.data);
}
socket.onclose = function() {
socket = undefined;
}
}


以下サーバ側のスクリプト。↓ が WebSocket でアクセスされる玄関口で、今回は処理を全て外部モジュールに渡すようにしました。

■ test_wsh.py
# coding: utf-8

import usr_psycopg2

def web_socket_do_extra_handshake(request):
pass

def web_socket_transfer_data(request):
while True:
line = request.ws_stream.receive_message()
if isinstance(line, unicode):
usr_psycopg2.main(request, line)
return


上のスクリプトから呼ばれるモジュールは、初回実行時にコンパイルされて .pyc ファイルができます。処理の中心が ↓ で、psycopg2 の非同期モードで PostgreSQL に接続、リアルタイムで RAISE INFO などのメッセージを受け取って WebSocket で返しつつ、最後にクエリ結果を送信。

■ usr_psycopg2.py
# coding: utf-8

import cgi
import codecs
import datetime
import psycopg2
import psycopg2.extensions
import select
import usr_cancel
import usr_xhtml

def main(req, line):
if usr_cancel.exec1(line):
ws_send(req, '中止指示を受信')
return
query(req, line)

def query(req, line):
dic = cgi.parse_qs(line.encode('utf-8'))
try:
con = dic['con'][0]
sql = dic['sql'][0]
except:
ws_send(req, 'input for connection or query.')
return

aconn = conn_open(req, con)
if aconn is None:
return
ckey = usr_cancel.start()
ws_send(req, 'cancelkey=' + ckey, 0)

try:
acurs = aconn.cursor()
acurs.execute(sql)
ws_send(req, 'query started')
wait(req, acurs.connection, ckey)

except psycopg2.Error as e:
if e.pgerror is None:
return
err = e.pgerror.replace('<', '&lt;')
err = err.replace(' ', '&nbsp;')
err = err.rstrip('\n').replace('\n', '<br />')
ws_send(req, err, 0)
conn_close(req, aconn)
return

ws_send(req, 'query ended')
usr_cancel.end(ckey)
try:
col = acurs.description
dat = acurs.fetchall()
except:
col = []
dat = []

acurs.close()
conn_close(req, aconn)

if len(col) == 0:
ws_send(req, 'クエリの結果出力はありません')
else:
ws_send(req, usr_xhtml.output(col, dat), 0)

def conn_open(req, con):
try:
aconn = psycopg2.connect(con, async=1)
wait(req, aconn)
except :
ws_send(req, 'unable to connect to the database.')
return

ws_send(req, 'connected')
return aconn

def conn_close(req, conn):
conn.close()
ws_send(req, 'disconnected')

def wait(req, conn, ckey=None):
note_pre = [0]
while True:
if ckey is not None and usr_cancel.exec2(ckey):
conn.cancel()
ws_send(req, '中止しました。ロールバックします')
conn.rollback()
state = conn.poll()
send_notices(req, conn, note_pre)
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)

def send_notices(req, conn, note_pre):
note = conn.notices
if len(note) == note_pre[0]: # 新規メッセージなければ終了
return
ws_send(req, note[-1]) # 新規メッセージだけ送信
note_pre[0] += 1

def ws_send(req, mes, addtime = 1):
if addtime:
mes = datetime.datetime.now().strftime('%H:%M:%S') + ' ' + mes
req.ws_stream.send_message(mes.decode('utf-8'))


上のモジュールから、下二つを呼び出し。中止処理を行う usr_cancel.py の方は、いずれ multiprocessing のプロセス間通信で置き換えたら不要になりますが、暫定版として。usr_xhtml.py はクエリ結果を単純な HTML テーブルにします。

■ usr_cancel.py
# coding: utf-8

import os
import random

def start():
# 乱数名でファイル作成
f_rand = str(random.random())
with open(f_rand, 'w') as f : pass
return f_rand

def end(ckey):
if os.path.isfile(ckey):
os.remove(ckey)

def exec1(line):
keystr = 'cancelkey='
if line.find(keystr) == 0:
f_rand = line[ len(keystr) : ]
if os.path.isfile(f_rand):
os.remove(f_rand)
return True
return False

def exec2(ckey):
if os.path.isfile(ckey) == False:
return True
return False

■ usr_xhtml.py
# coding: utf-8

def output(col, dat):
if len(col) == 0:
return

res = []
columns = [d[0] for d in col]
res.append('<table>')
tr = []

for c in columns :
s = str(c).decode('utf-8').encode('utf-8')
tr.append('<th>' + s + '</th>')
res.append(''.join(tr))

for r in dat:
tr = ['<tr>']
for c in r:
s = str(c).decode('utf-8').encode('utf-8')
tr.append('<td>' + s + '</td>')
tr.append('</tr>')
res.append(''.join(tr))
res.append('</table>')
return ''.join(res)


一応日本語でのクエリも可能ですが、作った後 websocket-sample を見たら、ブラウザ側で送信する時にエンコードするなと(not to use encodeURIComponent)…後で修正します。また時々、送信ボタン押下で二重にリクエストしてしまう現象があり、対策を考え中。

クエリの中止処理では、psycopg2 のサイトにある Cancelling PostgreSQL statements from Python と同様 cancel() を使用。その後 rollback() していますが、ドキュメントの非同期処理の項には commit() と rollback() が使えないとあり、正直よく分かりません。まだ作成・更新系クエリは試してないし、もっと調べていずれ書きます。
×

この広告は1年以上新しい記事の投稿がないブログに表示されております。