# PyParallel (Fast)
# (Uses picohttpparser's SSE-accelerated HTTP parser.)
import parallel
class Plaintext:
http11 = True
def plaintext(self, transport, data):
return b'Hello, World!'
server = parallel.server('0.0.0.0', 8080)
parallel.register(transport=server, protocol=Plaintext)
parallel.run()
# PyParallel (Slow)
# (Uses Python-based HTTP parser)
import parallel
from parallel.http.server import (
router,
make_routes,
text_response,
HttpServer,
)
routes = make_routes()
route = router(routes)
class Plaintext(HttpServer):
@route
def plaintext(self, request):
return text_response(request, text='Hello, World!')
server = parallel.server('0.0.0.0', 8080)
parallel.register(transport=server, protocol=Plaintext)
parallel.run()
# Implementation of entire TEFB suite in pure Python.
#===============================================================================
# Imports
#===============================================================================
import sys
import json
import pyodbc
import parallel
from ctk.util import (
try_int,
Dict,
)
from parallel import (
rdtsc,
sys_stats,
socket_stats,
memory_stats,
context_stats,
enable_heap_override,
disable_heap_override,
call_from_main_thread,
call_from_main_thread_and_wait,
)
from parallel.http.server import (
quote_html,
router,
make_routes,
text_response,
html_response,
json_serialization,
Request,
HttpServer,
)
# It can be a pain setting up the environment to load the debug version of
# numpy, so, if we can't import it, just default to normal Python random.
np = None
if not sys.executable.endswith('_d.exe'):
try:
import numpy as np
randint = np.random.randint
randints = lambda size: randint(0, high=10000, size=size)
randints2d = lambda size: randint(0, high=10000, size=(2, size))
print("Using NumPy random facilities.")
except ImportError:
pass
if not np:
import random
randint = random.randint
randints = lambda size: [ randint(1, 10000) for _ in range(0, size) ]
randints2d = lambda size: [
(x, y) for (x, y) in zip(randints(size), randints(size))
]
print("Couldn't load NumPy; reverting to normal CPython random facilities.")
#===============================================================================
# Aliases
#===============================================================================
#===============================================================================
# Globals/Templates
#===============================================================================
localhost_connect_string = (
'Driver={SQL Server};'
'Server=localhost;'
'Database=hello_world;'
'Uid=benchmarkdbuser;'
'Pwd=B3nchmarkDBPass;'
)
#===============================================================================
# Helpers
#===============================================================================
#===============================================================================
# Classes
#===============================================================================
class Fortune:
_bytes = [
(b'fortune: No such file or directory', 1),
(b"A computer scientist is someone who "
b"fixes things that aren't broken.", 2),
(b'After enough decimal places, nobody gives a damn.', 3),
(b'A bad random number generator: 1, 1, 1, 1, '
b'1, 4.33e+67, 1, 1, 1', 4),
(b'A computer program does what you tell it to do, '
b'not what you want it to do.', 5),
(b'Emacs is a nice operating system, but I prefer UNIX. '
b'\xe2\x80\x94 Tom Christaensen', 6),
(b'Any program that runs right is obsolete.', 7),
(b'A list is only as strong as its weakest link. '
b'\xe2\x80\x94 Donald Knuth', 8),
(b'Feature: A bug with seniority.', 9),
(b'Computers make very fast, very accurate mistakes.', 10),
(b'<script>alert("This should not be '
b'displayed in a browser alert box.");</script>', 11),
(b'\xe3\x83\x95\xe3\x83\xac\xe3\x83\xbc\xe3\x83\xa0'
b'\xe3\x83\xaf\xe3\x83\xbc\xe3\x82\xaf\xe3\x81\xae'
b'\xe3\x83\x99\xe3\x83\xb3\xe3\x83\x81\xe3\x83\x9e'
b'\xe3\x83\xbc\xe3\x82\xaf', 12),
]
fortunes = [ (r[0].decode('utf-8'), r[1]) for r in _bytes ]
header = (
'<!DOCTYPE html>'
'<html>'
'<head>'
'<title>Fortunes</title>'
'</head>'
'<body>'
'<table>'
'<tr>'
'<th>id</th>'
'<th>message</th>'
'</tr>'
)
row = '<tr><td>%d</td><td>%s</td></tr>'
footer = (
'</table>'
'</body>'
'</html>'
)
sql = 'select message, id from fortune'
@classmethod
def prepare_fortunes(cls, fortunes):
fortunes = [ (f[0], f[1]) for f in fortunes ]
fortunes.append(('Begin. The rest is easy.', 0))
fortunes.sort()
return fortunes
@classmethod
def render(cls, fortunes):
fortunes = cls.prepare_fortunes(fortunes)
row = cls.row
return ''.join((
cls.header,
''.join([ row % (f[1], quote_html(f[0])) for f in fortunes ]),
cls.footer,
))
@classmethod
def render_raw(cls):
return cls.render(cls.fortunes)
@classmethod
def render_db(cls, connect_string=None):
fortunes = cls.load_from_db(connect_string)
return cls.render(fortunes)
@classmethod
def load_from_db(cls, connect_string=None):
cs = connect_string or localhost_connect_string
con = pyodbc.connect(cs)
cur = con.cursor()
cur.execute(cls.sql)
return cur.fetchall()
@classmethod
def json_from_db(cls, connect_string=None):
results = Fortune.load_from_db(connect_string)
results = Fortune.prepare_fortunes(results)
fortunes = { r[1]: r[0] for r in results }
return json.dumps(fortunes)
routes = make_routes()
route = router(routes)
class TefbHttpServer(HttpServer):
routes = routes
db_sql = 'select id, randomNumber from world where id = ?'
update_sql = 'update world set randomNumber = ? where id = ?'
fortune_sql = 'select * from fortune'
@route
def json(self, request):
json_serialization(request, obj={'message': 'Hello, World'})
@route
def plaintext(self, request):
text_response(request, text='Hello, World!')
@route
def fortunes(self, request):
return html_response(request, Fortune.render_db(self.connect_string))
@route
def fortunes_raw(self, request):
return html_response(request, Fortune.render_raw())
@route
def db(self, request):
con = pyodbc.connect(self.connect_string)
cur = con.cursor()
cur.execute(self.db_sql, (randint(1, 10000)))
results = cur.fetchall()
db = {
'id': results[0][0],
'randomNumber': results[0][1],
}
return json_serialization(request, db)
@route
def queries(self, request):
count = try_int(request.query.get('queries')) or 1
if count < 1:
count = 1
elif count > 500:
count = 500
con = pyodbc.connect(self.connect_string)
cur = con.cursor()
ids = randints(count)
results = []
for npi in ids:
i = int(npi)
cur.execute(self.db_sql, i)
r = cur.fetchall()
results.append({'id': r[0][0], 'randomNumber': r[0][1]})
return json_serialization(request, results)
@route
def updates(self, request):
count = try_int(request.query.get('queries')) or 1
if count < 1:
count = 1
elif count > 500:
count = 500
con = pyodbc.connect(self.connect_string)
cur = con.cursor()
ints2d = randints2d(count)
results = []
updates = []
for npi in ints2d:
(i, rn) = (int(npi[0]), int(npi[1]))
cur.execute(self.db_sql, i)
o = cur.fetchall()
results.append((rn, i))
updates.append({'id': i, 'randomNumber': rn})
cur.executemany(self.update_sql, results)
cur.commit()
return json_serialization(request, updates)
@route
def stats(self, request):
stats = {
'system': dict(sys_stats()),
'server': dict(socket_stats(request.transport.parent)),
'memory': dict(memory_stats()),
'contexts': dict(context_stats()),
'elapsed': request.transport.elapsed(),
'thread': parallel.thread_seq_id(),
}
json_serialization(request, stats)
@route
def hello(self, request, *args, **kwds):
j = { 'args': args, 'kwds': kwds }
return json_serialization(request, j)
@route
def shutdown(self, request):
request.transport.shutdown_server()
json_serialization(request, obj={'message': 'Shutdown'})
@route
def elapsed(self, request):
obj = { 'elapsed': request.transport.elapsed() }
json_serialization(request, obj)
plaintext_http11_response = (
'HTTP/1.1 200 OK\r\n'
'Server: PyParallel Web Server v0.1\r\n'
'Date: Sat, 16 May 2015 15:21:34 GMT\r\n'
'Content-Type: text/plain;charset=utf-8\r\n'
'Content-Length: 15\r\n'
'\r\n'
'Hello, World!\r\n'
)
class CheatingPlaintextHttpServer:
initial_bytes_to_send = plaintext_http11_response
next_bytes_to_send = plaintext_http11_response
# vim:set ts=8 sw=4 sts=4 tw=80 et:
"""
A very simple wiki search HTTP server that demonstrates useful techniques
afforded by PyParallel: the ability to load large reference data structures
into memory, and then query them as part of incoming request processing in
parallel.
"""
#===============================================================================
# Imports
#===============================================================================
import json
import parallel
import socket
import datrie
import string
import urllib
import numpy as np
from collections import (
defaultdict,
)
from numpy import (
uint64,
)
from parallel import (
rdtsc,
sys_stats,
socket_stats,
memory_stats,
context_stats,
call_from_main_thread,
call_from_main_thread_and_wait,
CachingBehavior,
)
from parallel.http.server import (
router,
make_routes,
Request,
HttpServer,
RangedRequest,
)
from os.path import (
join,
exists,
abspath,
dirname,
normpath,
)
def join_path(*args):
return abspath(normpath(join(*args)))
#===============================================================================
# Configurables -- Change These!
#===============================================================================
# Change this to the directory containing the downloaded files.
#DATA_DIR = r'd:\data'
DATA_DIR = join_path(dirname(__file__), 'data')
# If you want to change the hostname listened on from the default (which will
# resolve to whatever IP address the computer name resolves to), do so here.
HOSTNAME = socket.gethostname()
# E.g.:
# HOSTNAME = 'localhost'
IPADDR = '0.0.0.0'
PORT = 8080
#===============================================================================
# Constants
#===============================================================================
# This file is huge when unzipped -- ~53GB. Although, granted, it is the
# entire Wikipedia in a single file. The bz2 version is much smaller, but
# still pretty huge. Search the web for instructions on how to download
# from one of the wiki mirrors, then bunzip2 it, then place in the same
# data directory.
WIKI_XML_NAME = 'enwiki-20150205-pages-articles.xml'
WIKI_XML_PATH = join_path(DATA_DIR, WIKI_XML_NAME)
# The following two files can be downloaded from
# http://download.pyparallel.org.
# This is a trie keyed by every <title>xxx</title> in the wiki XML; the value
# is a 64-bit byte offset within the file where the title was found.
# Specifically, it is the offset where the '<' bit of the <title> was found.
TITLES_TRIE_PATH = join_path(DATA_DIR, 'titles.trie')
# And because this file is so huge and the data structure takes so long to
# load, we have another version that was created for titles starting with Z
# and z (I picked 'z' as I figured it would have the least-ish titles). (This
# file was created via the save_titles_startingwith_z() method below.)
ZTITLES_TRIE_PATH = join_path(DATA_DIR, 'ztitles.trie')
# This is a sorted numpy array of uint64s representing the byte offset values
# in the trie. When given the byte offset of a title derived from a trie
# lookup, we can find the byte offset of where the next title starts within
# the xml file. That allows us to isolate the required byte range from the
# xml file where the particular title is defined. Such a byte range can be
# satisfied with a ranged HTTP request.
TITLES_OFFSETS_NPY_PATH = join_path(DATA_DIR, 'titles_offsets.npy')
#===============================================================================
# Aliases
#===============================================================================
uint64_7 = uint64(7)
uint64_11 = uint64(11)
#===============================================================================
# Globals
#===============================================================================
#wiki_xml = parallel.open(WIKI_XML_PATH, 'r', caching=CachingBehavior.RandomAccess)
offsets = np.load(TITLES_OFFSETS_NPY_PATH)
# Use the smaller one if the larger one doesn't exist.
if not exists(TITLES_TRIE_PATH):
TRIE_PATH = ZTITLES_TRIE_PATH
else:
TRIE_PATH = TITLES_TRIE_PATH
print("About to load titles trie, this will take a while...")
titles = datrie.Trie.load(TRIE_PATH)
#===============================================================================
# Misc Helpers
#===============================================================================
def save_titles_startingwith_z():
# Ok, the 11GB trie that takes 2 minutes to load is painful to develop
# with; let's whip up a little helper that just works on titles starting
# with 'Z'.
path = TITLES_TRIE_PATH.replace('titles.', 'ztitles.')
allowed = (string.printable + string.punctuation)
ztrie = datrie.Trie(allowed)
for c in ('Z', 'z'):
for (key, value) in titles.items(c):
if key in ztrie:
existing = ztrie[key]
for v in value:
if v not in existing:
existing.append(v)
existing.sort()
else:
ztrie[key] = value
ztrie.save(path)
def json_serialization(request=None, obj=None):
"""
Helper method for converting a dict `obj` into a JSON response for the
incoming `request`.
"""
transport = None
if not request:
request = Request(transport=None, data=None)
else:
transport = request.transport
if not obj:
obj = {}
#parallel.debug('obj: %r' % obj)
response = request.response
response.code = 200
response.message = 'OK'
response.content_type = 'application/json; charset=UTF-8'
response.body = json.dumps(obj)
return request
def text_serialization(request=None, text=None):
transport = None
if not request:
request = Request(transport=None, data=None)
else:
transport = request.transport
if not text:
text = 'Hello, World!'
response = request.response
response.code = 200
response.message = 'OK'
response.content_type = 'text/plain; charset=UTF-8'
response.body = text
return request
#===============================================================================
# Offset Helpers
#===============================================================================
# Three implementations of the same functionality: given a key, look up all
# items in the trie starting with that key, then return the relevant offsets
# for each one, such that a client can then issue a ranged HTTP request for
# the bytes returned.
# >>> results = titles.items('Ap')
# >>> len(results)
# 16333
#
# So, how long does it take to construct the result set for 16,333 hits?
# (That is, 16,333 Wikipedia pages whose page title starts with 'Ap'.)
#
# >>> from ctk.util import timer
# >>> with timer.timeit():
# ... _ = dumps(get_page_offsets_for_key2('Ap'))
# ...
# 274ms
# >>> with timer.timeit():
# ... _ = dumps(get_page_offsets_for_key('Ap'))
# ...
# 278ms
# >>> with timer.timeit():
# ... _ = dumps(get_page_offsets_for_key3('Ap'))
# ...
# 256ms
def get_page_offsets_for_key(search_string):
items = titles.items(search_string)
results = defaultdict(list)
for (key, value) in items:
for v in value:
o = uint64(v if v > 0 else v*-1)
ix = offsets.searchsorted(o, side='right')
results[key].append((int(o-uint64_7), int(offsets[ix]-uint64_11)))
return results or None
def get_page_offsets_for_key2(search_string):
items = titles.items(search_string)
if not items:
return None
results = [ [None, None, None] for _ in range(0, len(items)) ]
assert len(results) == len(items), (len(results), len(items))
for (i, pair) in enumerate(items):
(key, value) = pair
for (j, v) in enumerate(value):
rx = i + j
o = uint64(v if v > 0 else v*-1)
ix = offsets.searchsorted(o, side='right')
results[rx][0] = key
results[rx][1] = int(o-uint64_7)
results[rx][2] = int(offsets[ix]-uint64_11)
return results
def get_page_offsets_for_key3(search_string):
results = []
items = titles.items(search_string)
if not items:
return results
for (key, value) in items:
v = value[0]
o = uint64(v if v > 0 else v*-1)
ix = offsets.searchsorted(o, side='right')
results.append((key, int(o-uint64_7), int(offsets[ix]-uint64_11)))
return results
#===============================================================================
# Web Helpers
#===============================================================================
def exact_title(title):
if title in titles:
return json.dumps([[title, ] + [ t for t in titles[title] ]])
else:
return json.dumps([])
#===============================================================================
# Classes
#===============================================================================
routes = make_routes()
route = router(routes)
class WikiServer(HttpServer):
routes = routes
@route
def wiki(self, request, name, **kwds):
# Do an exact lookup if we find a match.
if name not in titles:
return self.error(request, 404)
o = titles[name][0]
o = uint64(o if o > 0 else o*-1)
ix = offsets.searchsorted(o, side='right')
start = int(o-uint64_7)
end = int(offsets[ix]-uint64_11)
range_request = '%d-%d' % (start, end)
request.range = RangedRequest(range_request)
request.response.content_type = 'text/xml; charset=utf-8'
return self.sendfile(request, WIKI_XML_PATH)
@route
def offsets(self, request, name, limit=None):
if not name:
return self.error(request, 400, "Missing name")
if len(name) < 3:
return self.error(request, 400, "Name too short (< 3 chars)")
return json_serialization(request, get_page_offsets_for_key3(name))
@route
def xml(self, request, *args, **kwds):
if not request.range:
return self.error(request, 400, "Ranged-request required.")
else:
request.response.content_type = 'text/xml; charset=utf-8'
return self.sendfile(request, WIKI_XML_PATH)
@route
def stats(self, request, *args, **kwds):
stats = {
'system': dict(sys_stats()),
'server': dict(socket_stats(request.transport.parent)),
'memory': dict(memory_stats()),
'contexts': dict(context_stats()),
'elapsed': request.transport.elapsed(),
'thread': parallel.thread_seq_id(),
}
if args:
name = args[0]
if name in stats:
stats = { name: stats[name] }
return json_serialization(request, stats)
@route
def hello(self, request, *args, **kwds):
j = { 'args': args, 'kwds': kwds }
return json_serialization(request, j)
@route
def title(self, request, name, *args, **kwds):
items = titles.items(name)
return json_serialization(request, items)
@route
def elapsed(self, request, *args, **kwds):
obj = { 'elapsed': request.transport.elapsed() }
return json_serialization(obj)
@route
def json(self, request, *args, **kwds):
return json_serialization(request, {'message': 'Hello, World!'})
@route
def plaintext(self, request, *args, **kwds):
return text_serialization(request, text='Hello, World!')
#===============================================================================
# Main
#===============================================================================
def main():
server = parallel.server(IPADDR, PORT)
parallel.register(transport=server, protocol=WikiServer)
parallel.run_once()
return server
if __name__ == '__main__':
server = main()
parallel.run()
# vim:set ts=8 sw=4 sts=4 tw=78 et: