from xml.etree.ElementTree import parse # 読み込み mapping = {} tree = parse('books.xml') for B in tree.findall('book'): isbn = B.attrib['isbn'] for T in B.findall('title'): mapping[isbn] = T.text pprint.pprint(mapping) # 書き込み。文字コードを指定し xml_declaration=True にすると、 # 先頭に宣言() が入る tree.write('out.xml', encoding="utf-8", xml_declaration=True)
2012年10月26日金曜日
python - etree を使った xml 解析
SAX よりも直感的
2012年10月25日木曜日
Book - 『ぼくが愛したゴウスト』 by 打海文三
『3652』で紹介されていたのを見て手にしてみたけど、面白かった。
ストーリーも良かったけど、文体が読みやすくて気に入ったから
この作家の本は他にも読んでみたいと思う。
ストーリーも良かったけど、文体が読みやすくて気に入ったから
この作家の本は他にも読んでみたいと思う。
python - SAX を使った xml のパース
パースされている最中に呼ばれる関数を実装する。
自前で状態を保存しなければならないので、入れ子が多いなど複雑なファイルのパースには向きそうにない。
import xml.sax, xml.sax.handler, pprint class BookHandler(xml.sax.handler.ContentHandler): def __init__(self): self.inTitle = False self.mapping = {} def startElement(self, name, attributes): if name == 'book': self.buffer = "" self.isbn = attributes["isbn"] elif name == "title": self.inTitle = True def characters(self, data): if self.inTitle: self.buffer += data def endElement(self, name): if name == "title": self.inTitle = False self.mapping[self.isbn] = self.buffer parser = xml.sax.make_parser() handler = BookHandler() parser.setContentHandler(handler) parser.parse('books.xml') pprint.pprint(handler.mapping) print handler.mapping
2012年10月21日日曜日
Book - 『3652』 by 伊坂幸太郎
伊坂幸太郎のエッセイ集。
面白かった。人の考え方を読むことは楽しい。
作家としてのプロ意識みたいな物もすごく感じられた。
特に以下のエッセイがお気に入り。
面白かった。人の考え方を読むことは楽しい。
作家としてのプロ意識みたいな物もすごく感じられた。
特に以下のエッセイがお気に入り。
- 「亡くなったけれど、ベンチにいる」人たちの声が聞こえる短編集
- 武田幸三という格闘家の存在
- 十年目に考えたこと。
2012年10月20日土曜日
python - sqlite3 : テーブル一覧の取得
sqlite_master から type="table" を select する
import sqlite3 conn = sqlite3.connect('sq.db') cursor = conn.cursor() cursor.execute('select * from sqlite_master WHERE type="table"') for item in cursor.fetchall(): print item
python - sqlite3 を使う
connect して cursor を作成してコマンドを実行する。
import sqlite3 conn = sqlite3.connect('sq.db') cursor = conn.cursor() cursor.execute('drop table people') create_table_command = 'create table people (name char(30), age int(4))' cursor.execute(create_table_command) cursor.execute('insert into people values (?, ?)', ('Aoki',50)) cursor.executemany('insert into people values (?, ?)', [('Ishikawa', 70), ('Ueno', 40)]) conn.commit() cursor.execute('select * from people') print cursor.fetchall() cursor.execute('select * from people where age >= 50') print cursor.fetchall() cursor.execute('update people set age=? where name = ?', (51, 'Aoki')) cursor.execute('select * from people where age >= 50') print cursor.fetchall()
Blogger - SyntaxHighlighter の導入
参考
BloggerにおけるSyntaxHighlighterの使い方
SyntaxHighlighter 行番号折り返し問題
テンプレートに以下の内容を追加。
一番下の css は Chrome で行数表示がバグってしまう問題に対応するためのもの。
BloggerにおけるSyntaxHighlighterの使い方
SyntaxHighlighter 行番号折り返し問題
テンプレートに以下の内容を追加。
一番下の css は Chrome で行数表示がバグってしまう問題に対応するためのもの。
python - shelve を使ったデータ保存
キー、バリュー形式で簡単にデータの保存ができる。
import shelve dbase = shelve.open('database') dbase['1'] = ['a', 'b', 'c'] dbase['2'] = 2 for key in dbase: print dbase[key] # shelv.open で writeback=True を指定しないと # これでは dbase['1'] の中身は変更されない。 dbase['1'].append('d') dbase['2'] = 'Two' for key in dbase: print dbase[key] # dbase['1'] の中身を変更する。 tmp = dbase['1'] tmp.append('d') dbase['1'] = tmp for key in dbase: print dbase[key] # データの削除 del dbase['1'] dbase.close()
2012年10月16日火曜日
python - socketserver の利用
socketserver ( 2.x 系では SocketServer ) を利用することで簡単にサーバー側のプログラムを作成できる
import SocketServer
class MyClientHandler(SocketServer.BaseRequestHandler):
def handle(self):
print(self.client_address, now())
time.sleep(5)
while True:
data = self.request.recv(1024)
if not data:
break
reply = 'Echo=>%s\' % data
self.request.send(reply.encode())
self.request.close()
if __name__=='__main__':
myaddr = ('', 1234)
server = SocketServer.ThreadingTCPServer(myaddr, MyClientHandler)
server.serve_forever()
import SocketServer
class MyClientHandler(SocketServer.BaseRequestHandler):
def handle(self):
print(self.client_address, now())
time.sleep(5)
while True:
data = self.request.recv(1024)
if not data:
break
reply = 'Echo=>%s\' % data
self.request.send(reply.encode())
self.request.close()
if __name__=='__main__':
myaddr = ('', 1234)
server = SocketServer.ThreadingTCPServer(myaddr, MyClientHandler)
server.serve_forever()
2012年10月15日月曜日
python - pickle モジュールによるバイナリデータ化
バイナリデータにできる。保存やソケット通信などに利用。
import pickle
# 変換
binary_data = pickle.dumps(data)
data = pickle.loads(binary_data)
# ファイル書き込み
file = open('data.bin', 'wb')
pickle.dump(data, file)
file.close()
# ファイル読み込み
file = open('data.bin', 'rb')
data = pickle.loads(file)
file.close()
import pickle
# 変換
binary_data = pickle.dumps(data)
data = pickle.loads(binary_data)
# ファイル書き込み
file = open('data.bin', 'wb')
pickle.dump(data, file)
file.close()
# ファイル読み込み
file = open('data.bin', 'rb')
data = pickle.loads(file)
file.close()
2012年10月14日日曜日
python - 正規表現
re モジュールを使う
re.MULTILINE フラグを使わないと、行頭 ^ と行末 $ が、それぞれ改行の直後、改行の直前に引っかからないので注意。(文字列の先頭と終端のみに引っかかる)
- re.search(pattern, text) : 一番最初のマッチを取ってくる
- re.findall(pattern, text) : 全てのマッチを取ってくる
- re.sub(pattern, sub, text) : pattern にマッチしたものを sub で置き換える
参考 : Python reモジュールで使える正規表現演算子
import re filetext = open('list_inserter.hpp').read() pattern = re.compile('#include\s+<.*$', re.MULTILINE) print re.findall(pattern, filetext)
2012年10月13日土曜日
python - cygwin + Tkinter で "no display name and no $DISPLAY environment variable"
参考 : Cannot launch git gui using cygwin on windows
[準備]
[実行前]
XWin Server を起動するのが少し面倒だし、そもそもインストールが必要なので他の人に使ってもらいづらい。
[準備]
- Cygwin の setup.exe で xinit と X-start-menu-icons をインストールする
[実行前]
- Windows のスタートメニューから XWin Server を起動する
- Cygwin のシェルで export DISPLAY=:0.0 と入力する or Python のスクリプト内で os.environ['DISPLAY'] = ':0.0' として環境変数を設定する
XWin Server を起動するのが少し面倒だし、そもそもインストールが必要なので他の人に使ってもらいづらい。
python - デフォルトプログラムでファイルを開く
os.startfile('filename') を使う。
# txt ファイルを開く
os.startfile('text.txt')
# Webページを開く
os.startfile('https://www.google.co.jp/webhp?hl=ja')
# txt ファイルを開く
os.startfile('text.txt')
# Webページを開く
os.startfile('https://www.google.co.jp/webhp?hl=ja')
2012年10月12日金曜日
python - modal ダイアログを作る
grab_set() で他のウィンドウにイベントが届かないようにし
wait_window() で終了を待つ
win = TopLevel()
# ...
win.focus_set()
win.grab_set()
win.wait_window()
wait_window() で終了を待つ
win = TopLevel()
# ...
win.focus_set()
win.grab_set()
win.wait_window()
python - 可変長引数
アスタリスク1つでキーワードなし(tuple)、
アスタリスク2つでキーワードあり(dictionary)
def flexible_tuple(*args):
print args
def flexible_dict(**kwargs):
print kwargs
def flexible(*args,**kwargs):
print args
print kwargs
flexible_tuple('a', 1, 2)
flexible_dict(a=1,b=2,c='c',d=[1,2,3])
flexible('a',1,2,a=1,b=2,c='c',d=[1,2,3])
2012年10月11日木曜日
python - os.path.normpath と os.path.normcase
パス名の比較、パスをキーにした辞書を作成する際等に使う。
- os.path.normpath : 余分な区切り文字や上位レベル参照を削除し、パスの名前を標準化する。Windows ならスラッシュをバックスラッシュに変換する。
- os.path.normcase : パス名の大文字、小文字をシステムの標準にする。 Windowsでは、スラッシュをバックスラッシュに変換する。
2012年10月10日水曜日
python - multiprocessing : Process のサブクラス化と Queue による通信
- サブクラス化する事で状態変数を保持できる
- Queue を使うことで安全に情報のやり取りができる
from multiprocessing import Process, Queue
class Counter(Process):
label = ' @'
def __init__(self, start, queue):
self.state = start
self.post = queue
Process.__init__(self)
def run(self):
for i in range(3):
time.sleep(1)
self.state += 1
print(self.label, self.pid, self.state)
self.post.put([self.pid, self.state])
print(self.label, self.pid, '-')
if __name__ == '__main__':
print('start',os.getpid())
expected = 9
post = Queue()
p = Counter(0, post)
q = Counter(100, post)
r = Counter(1000, post)
p.start()
q.start()
r.start()
while expected:
time.sleep(0.5)
try:
data = post.get(block=False)
except :
print 'no data...'
else:
print('posted:', data)
expected -= 1
p.join()
q.join()
r.join()
print('finish', os.getpid(), r.exitcode)
2012年10月9日火曜日
python - multiprocessing : Shared Memory
シェアするデータをパッケージ内のクラスとして作成し、Process の引数で渡す。
from multiprocessing import Process, Value, Array
procs = 3
count = 0 # グローバル変数はシェアされない
def showdata(label, val, arr):
msg = '%-12s: pid:%4s, global:%s, value:%s, array:%s'
print msg % (label, os.getpid(), count, val.value, list(arr))
def updater(val, arr):
global count
count += 1
val.value += 1
for i in range(3):
arr[i] += 1
if __name__ == '__main__':
scalar = Value('i',0)
vector = Array('d', procs)
showdata('parent start', scalar, vector)
p = Process(target=showdata, args=('child ', scalar, vector))
p.start()
p.join()
for i in range(procs):
count += 1
scalar.value += 1
vector[i] += 1
p = Process(target=showdata, args=('proc %s' % i, scalar, vector))
p.start()
p.join()
ps = []
for i in range(procs):
count += 1
scalar.value += 1
vector[i] += 1
p = Process(target=showdata, args=('proc %s' % i, scalar, vector))
p.start()
ps.append(p)
for p in ps:
p.join()
for i in range(procs):
p = Process(target=updater, args=(scalar, vector))
p.start()
p.join()
showdata('parent tmp', scalar, vector)
ps = []
for i in range(procs):
p = Process(target=updater, args=(scalar, vector))
p.start()
ps.append(p)
for p in ps:
p.join()
showdata('parent end', scalar, vector)
結果
parent start: pid:1468, global:0, value:0, array:[0.0, 0.0, 0.0]
child : pid:6496, global:0, value:0, array:[0.0, 0.0, 0.0]
proc 0 : pid:4480, global:0, value:1, array:[1.0, 0.0, 0.0]
proc 1 : pid:1776, global:0, value:2, array:[1.0, 1.0, 0.0]
proc 2 : pid:4896, global:0, value:3, array:[1.0, 1.0, 1.0]
proc 0 : pid:1436, global:0, value:6, array:[2.0, 2.0, 2.0]
proc 1 : pid:4008, global:0, value:6, array:[2.0, 2.0, 2.0]
proc 2 : pid:6352, global:0, value:6, array:[2.0, 2.0, 2.0]
parent tmp : pid:1468, global:6, value:9, array:[5.0, 5.0, 5.0]
parent end : pid:1468, global:6, value:12, array:[8.0, 8.0, 8.0]
proc * では global は常に 0
paremt tmp, parent end で global が 6 なのは main 内の count += 1 の影響で、Process の updater 分は加算されない
python - multiprocessing : Pipe による通信
from multiprocessing import Process, Pipe
def sender(pipe):
pipe.send(['spam'] + [42, 'eggs']) # pipe.send() で書き込み
pipe.close()
def talker(pipe):
pipe.send(dict(name='Bob', spam=42))
reply = pipe.recv() # pipe.recv() で読み込み
print('talker got', reply)
if __name__=='__main__':
parentEnd, childEnd = Pipe() # パイプを作成
Process(target=sender, args=(childEnd,)).start() # プロセスを始める
print('parent got:', parentEnd.recv())
parentEnd.close()
parentEnd, childEnd = Pipe()
child = Process(target=talker, args=(childEnd,))
child.start()
print('parent got:', parentEnd.recv())
parentEnd.send({x * 2 for x in 'spam'})
child.join()
print 'parent exit'
python - multiprocessing について
スレッドのように利用できるプロセスを作成する。(Windows と Linux で実装が異なる)
プロセスなので、マルチコアによる速度向上が狙える。
データのやり取りにはグローバル変数等は使用せず、Pipe, Shared Memory, Queue などを使う必要がある。('picklable' なオブジェクトしかやりとりできないという制限はある)
2012年10月8日月曜日
python - GIL : グローバルインタプリタロック
Wikipedia : グローバルインタプリタロック
グローバルインタプリタロック(英: Global Interpreter Lock, GIL)とは、プログラミング言語のインタプリタのスレッドによって保持されるスレッドセーフでないコードを、他のスレッドと共有してしまうことを防ぐための排他 ロックである。インタプリタのひとつのプロセスごとに必ずひとつの GIL が存在する。
Python (正確には C言語による実装であるCPython) はグローバルインタプリタロックを採用しているので、マルチプロセッサのマシンでマルチスレッドを実行しても全く速度の向上が見られない、との事。
グローバルインタプリタロック(英: Global Interpreter Lock, GIL)とは、プログラミング言語のインタプリタのスレッドによって保持されるスレッドセーフでないコードを、他のスレッドと共有してしまうことを防ぐための排他 ロックである。インタプリタのひとつのプロセスごとに必ずひとつの GIL が存在する。
Python (正確には C言語による実装であるCPython) はグローバルインタプリタロックを採用しているので、マルチプロセッサのマシンでマルチスレッドを実行しても全く速度の向上が見られない、との事。
python - socket を使ったプロセス間通信
#!/usr/bin/env python
# -*- coding: sjis -*-
import os
import sys
import threading
from socket import socket, AF_INET, SOCK_STREAM
# AF_INET = Adress Family : Internet Sockets IPv4 を指す
# SOCK_STREAM = 通信プロトコル TCP を使用。 SOCK_DGRAM だと UDP
port = 50008
host = 'localhost'
stdoutmutex = threading.Lock()
def server():
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(('', port)) # アドレスとポートを指定. '' は 'localhost' に置き換えても同じ模様
sock.listen(5) # 同時に接続できる個数
while True:
conn, addr = sock.accept()
data = conn.recv(1024) # bufsize
with stdoutmutex:
print data
reply = 'server got : [%s]\n' % data
conn.send(reply.encode())
def client(name):
sock = socket(AF_INET, SOCK_STREAM)
sock.connect((host, port))
sock.send(name.encode())
reply = sock.recv(1024)
sock.close()
with stdoutmutex:
print 'client got : [%s]' % reply
if __name__=='__main__':
argv = sys.argv
if len(argv) <= 1:
sys.exit(1)
elif argv[1] == '-s':
sthread = threading.Thread(target=server)
sthread.daemon = False
sthread.start()
elif argv[1] == '-c':
for i in range(5):
threading.Thread(target=client, args=('client % s' % i,)).start()
# -*- coding: sjis -*-
import os
import sys
import threading
from socket import socket, AF_INET, SOCK_STREAM
# AF_INET = Adress Family : Internet Sockets IPv4 を指す
# SOCK_STREAM = 通信プロトコル TCP を使用。 SOCK_DGRAM だと UDP
port = 50008
host = 'localhost'
stdoutmutex = threading.Lock()
def server():
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(('', port)) # アドレスとポートを指定. '' は 'localhost' に置き換えても同じ模様
sock.listen(5) # 同時に接続できる個数
while True:
conn, addr = sock.accept()
data = conn.recv(1024) # bufsize
with stdoutmutex:
print data
reply = 'server got : [%s]\n' % data
conn.send(reply.encode())
def client(name):
sock = socket(AF_INET, SOCK_STREAM)
sock.connect((host, port))
sock.send(name.encode())
reply = sock.recv(1024)
sock.close()
with stdoutmutex:
print 'client got : [%s]' % reply
if __name__=='__main__':
argv = sys.argv
if len(argv) <= 1:
sys.exit(1)
elif argv[1] == '-s':
sthread = threading.Thread(target=server)
sthread.daemon = False
sthread.start()
elif argv[1] == '-c':
for i in range(5):
threading.Thread(target=client, args=('client % s' % i,)).start()
2012年10月7日日曜日
python - 他のプログラムを起動して標準出力をノンブロックで監視する
Non-blocking read on a subprocess.PIPE in python
(更新)
上のリンクに書いてあるソースにコメントを加え、結果を利用する main 関数を追加。
#!/usr/bin/env python
# -*- coding: sjis -*-
import os
import sys
from subprocess import Popen, PIPE
import threading
import time
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty # python 3.x
# posix モジュールはオペレーティングシステムの機能のうち、C 言語標準 および POSIX 標準 (Unix インタフェースをほんの少し隠蔽した) で標準化されている機能に対するアクセス機構を提供する
# 非 Unix オペレーティングシステムでは posix モジュール を使うことはできない→要するに Unix 系か Windows かを判定している
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
# iter は第2引数があると、1番目の関数の返り値と2番目の返り値が一致した際に iteration が終わる
for line in iter(out.readline, b''):
queue.put(line.rstrip())
out.close()
print 'enqueue_output finished'
if __name__=='__main__':
argv = sys.argv
argc = len(argv)
if argc != 3:
sys.exit(1)
if not argv[2].isdigit():
sys.exit(1)
keyword = argv[1]
time_limit = int(argv[2])
print 'Keyword : %s' % keyword
print 'Time limit : %d' % time_limit
# stdout=PIPE : 新しいパイプが子プロセスに向けて作られる
# bufsize = 1 : 行毎にバッファ
# close_fds : Unix 系 の場合、 true にすると子プロセス実行前に 0, 1, 2 以外のファイルディスクリプタが閉じられる。
# Windowsの場合、 true にすると子プロセスにハンドルが継承されない。 Windows の場合は close_fds を true にしながら、 stdin, stdout, stderr を利用して標準ハンドルをリダイレクトすることはできない
p = Popen(['python','outputter.py'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = threading.Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # プログラムの終了と同時に終了する
t.start()
startTime = time.time()
keyword_found = False
while True:
if time.time() - startTime > time_limit:
print 'Time out. Keyword %s not found...' % keyword
keyword_found = False
break
try:
line = q.get_nowait() # タイムアウト無しで取得。何もなければ Empty 例外を出す
except Empty:
continue
# print('no output yet')
else: # got line
if line == keyword:
print 'Keyword %s found' % keyword
keyword_found = True
break
else:
print line
try:
p.kill()
except OSError:
pass
if keyword_found:
print 'Success'
else:
print 'False'
(更新)
上のリンクに書いてあるソースにコメントを加え、結果を利用する main 関数を追加。
#!/usr/bin/env python
# -*- coding: sjis -*-
import os
import sys
from subprocess import Popen, PIPE
import threading
import time
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty # python 3.x
# posix モジュールはオペレーティングシステムの機能のうち、C 言語標準 および POSIX 標準 (Unix インタフェースをほんの少し隠蔽した) で標準化されている機能に対するアクセス機構を提供する
# 非 Unix オペレーティングシステムでは posix モジュール を使うことはできない→要するに Unix 系か Windows かを判定している
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
# iter は第2引数があると、1番目の関数の返り値と2番目の返り値が一致した際に iteration が終わる
for line in iter(out.readline, b''):
queue.put(line.rstrip())
out.close()
print 'enqueue_output finished'
if __name__=='__main__':
argv = sys.argv
argc = len(argv)
if argc != 3:
sys.exit(1)
if not argv[2].isdigit():
sys.exit(1)
keyword = argv[1]
time_limit = int(argv[2])
print 'Keyword : %s' % keyword
print 'Time limit : %d' % time_limit
# stdout=PIPE : 新しいパイプが子プロセスに向けて作られる
# bufsize = 1 : 行毎にバッファ
# close_fds : Unix 系 の場合、 true にすると子プロセス実行前に 0, 1, 2 以外のファイルディスクリプタが閉じられる。
# Windowsの場合、 true にすると子プロセスにハンドルが継承されない。 Windows の場合は close_fds を true にしながら、 stdin, stdout, stderr を利用して標準ハンドルをリダイレクトすることはできない
p = Popen(['python','outputter.py'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = threading.Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # プログラムの終了と同時に終了する
t.start()
startTime = time.time()
keyword_found = False
while True:
if time.time() - startTime > time_limit:
print 'Time out. Keyword %s not found...' % keyword
keyword_found = False
break
try:
line = q.get_nowait() # タイムアウト無しで取得。何もなければ Empty 例外を出す
except Empty:
continue
# print('no output yet')
else: # got line
if line == keyword:
print 'Keyword %s found' % keyword
keyword_found = True
break
else:
print line
try:
p.kill()
except OSError:
pass
if keyword_found:
print 'Success'
else:
print 'False'
python - thread と threading
thread.start_new_thread で生成したスレッドは main スレッドの終了と共に終了する。
threading で生成されたスレッドを daemon = True として設定しない場合
そのスレッドが終了するまで main スレッドは終了しない。
threading で生成されたスレッドを daemon = True として設定しない場合
そのスレッドが終了するまで main スレッドは終了しない。
python - mutex と with
with を使って、mutex.acruire() と mutex.release() を置き換える事ができる
import thread
stdoutmutex = thread.allocate_lock()
with stdoutmutex:
print 'Hello'
# 変更前
# stdoutmutex.acquire()
# print 'Hello'
# stdoutmutex.release()
import thread
stdoutmutex = thread.allocate_lock()
with stdoutmutex:
print 'Hello'
# 変更前
# stdoutmutex.acquire()
# print 'Hello'
# stdoutmutex.release()
2012年10月6日土曜日
python - os.exec** シリーズ
execv, execl, execvp, execlp, execle, execve, execvpe, execlpe 等色々あるけれど…
- v と l の違い
- v は tuple もしくは list で実行するプログラムの引数を指定する
- execvp('python', ['python', 'test.py'])
- l は文字列を指定する
- execlp('python', 'python', 'test.py')
- p を付加すると実行するプログラムをシステムパス (ie. PATH) から探す
- e を付加すると実行時に指定する環境変数の Dictionary を最後の引数で指定できる。指定するとこの関数を呼んだプログラムが持っていた環境変数(=PCにデフォルトで設定されている環境変数)は引き継がれないので、追加する場合は os.environ で取得した環境変数に追加したものを渡す
python - 絶対パスのファイル文字列からフォルダ名とファイル名を取得する
abs_path = 'C:\Python\Text.txt'
head, tail = os.path.split(abs_path)
# head = 'C:\Python'
# tail = 'Text.txt'
head がディレクトリ名、 tail がファイル名として取得できる。
os.path.join(head, tail) で元に戻せる。
head, tail = os.path.split(abs_path)
# head = 'C:\Python'
# tail = 'Text.txt'
head がディレクトリ名、 tail がファイル名として取得できる。
os.path.join(head, tail) で元に戻せる。
python - Binary データを扱う
struct モジュールを使う
あまり直接使う機会は多くないと思うけど、知識として。
import struct
# Binary データの作成。最初の文字列はフォーマット文字列
i は int, f は float, 8s は 8文字の文字列
data = struct.pack('iif8s', 128, 80, 2.34, '12345678')
# Binary データの読み込み。
values = struct.unpack('iif8s', data)
values は tuple になる。
ファイルに書き込み、読み込みするときはバイナリデータとして扱うために wb, rb にすることを忘れないように注意。
あまり直接使う機会は多くないと思うけど、知識として。
import struct
# Binary データの作成。最初の文字列はフォーマット文字列
i は int, f は float, 8s は 8文字の文字列
data = struct.pack('iif8s', 128, 80, 2.34, '12345678')
# Binary データの読み込み。
values = struct.unpack('iif8s', data)
values は tuple になる。
ファイルに書き込み、読み込みするときはバイナリデータとして扱うために wb, rb にすることを忘れないように注意。
python - 他のプログラムの標準出力を監視する
ブロックしないようにタイムアウト付きの入力を使って
スレッドで時間経過を監視…しているけど、別にスレッド使う必要はなかったような。
少し前に書いた物なので意図を忘れてしまった。
import os
import sys
import time
import threading
import select
startTime = time.time()
endTime = startTime
def inputWithTimeout(timeout_time):
i, o, e = select.select([sys.stdin], [], [], timeout_time)
if (i):
return sys.stdin.readline().strip()
else:
return None
def printUsage():
print 'python ' + os.path.basename(__file__) + ' [KEYWORD] [TIMEOUT(S)]'
def timeoutMonitorWorker():
timeout = False
while not timeout:
now = time.time()
if now > endTime:
timeout = True
try:
time.sleep(1)
except KeyboardInterrupt:
timeout = True
if __name__ == '__main__':
argv = sys.argv
argc = len(argv)
if argc != 3:
print printUsage()
sys.exit(1)
endTime += int(argv[2])
timeOutMonitorThread = threading.Thread(target=timeoutMonitorWorker)
timeOutMonitorThread.daemon = True
timeOutMonitorThread.start()
while True:
try:
if not timeOutMonitorThread.isAlive():
print 'Monitor : Timeout...'
sys.exit(-1)
stdInput = inputWithTimeout(1)
if stdInput == "":
print 'EOF Detected...'
sys.exit(-1)
if stdInput == None:
continue
print stdInput
if stdInput == argv[1]:
print 'Monitor : ' + 'Keyword [%s] found' % argv[1]
sys.exit()
except EOFError:
print 'Monitor : EOF detected...'
sys.exit(-1)
登録:
投稿 (Atom)