PyParallel

An experimental, proof-of-concept fork of Python 3
designed to optimally exploit multiple CPU cores, fast
SSDs, NUMA architectures and 10Gb+ Ethernet networks.

Removes the limitation of the Global Interpreter Lock (GIL)
without needing to remove it at all.

Async I/O meets multicore.

PyParallel combines the concurrency benefits afforded by single-threaded async I/O
architectures with the performance benefits afforded by simultaneous multi-threading.

Scales linearly with cores and I/O bandwidth.

PyParallel exhibits excellent linear scaling across all cores when sufficient client
load can be generated, maintaining low-latency and high-throughput under load.

Includes PyParallel-compatible versions of NumPy, datrie, pyodbc, IPython, IPython Notebook, Pandas, Cython and many more.

Load large NumPy arrays or datrie tries in main memory once, then access them
efficiently from multiple cores.
Or, connect to an ODBC database and access data in parallel.

Ultra-low latency, high concurrency, maximal throughput.

All delivered from a single Python process that dynamically adjusts its
behavior to maximise the performance of underlying hardware.

Performance

Linear Scaling

PyParallel's throughput (HTTP requests per second) scales linearly as concurrency
increases, indicating optimal use of multiple cores. Additionally, percentile
latencies show very desirable properties across the entire range.

Benchmarks

PyParallel fares well against the competition. In the following benchmark,
PyParallel (Fast) leverages the new SSE-accelerated HTTP parser and
other optimizations, whereas PyParallel (Slow) uses HTTP parser written
entirely in Python. A concurrency of 8 was used for the test. All times are
in microseconds.

Presentations

For more information about how PyParallel was implemented, see the following presentations.

Parallelizing the Python Interpreter: An alternate approach to async

PyParallel: How we removed the GIL and exploited all cores (without needing to remove the GIL at all)

Parallelism and Concurrency with Python

March, 2013
(35 pages)

November, 2013
(153 pages)

May, 2014
(41 pages)

A short presentation given to Python core developers at PyCon in March, 2013.

The first major public announcement of PyParallel, delivered at PyData NYC in November 2013. Deep-dive into the background of PyParallel, with focus on asynchronous I/O and event-driven socket servers.

A higher-level look at the notions of parallelism and concurrency within the Python ecosystem. Delivered at Bank of America developer summit day in NYC, May 2014.

Examples

Some real-life examples of PyParallel.

  • Plaintext (Fast)
  • Plaintext (Slow)
  • TEFB
  • Parallelopedia

# PyParallel (Fast)
# (Uses picohttpparser's SSE-accelerated HTTP parser.)
import parallel
class Plaintext:
    http11 = True
    def plaintext(self, transport, data):
        return b'Hello, World!'

server = parallel.server('0.0.0.0', 8080)
parallel.register(transport=server, protocol=Plaintext)
parallel.run()

# PyParallel (Slow)
# (Uses Python-based HTTP parser)
import parallel
from parallel.http.server import (
    router,
    make_routes,
    text_response,

    HttpServer,
)

routes = make_routes()
route = router(routes)

class Plaintext(HttpServer):
    @route
    def plaintext(self, request):
        return text_response(request, text='Hello, World!')

server = parallel.server('0.0.0.0', 8080)
parallel.register(transport=server, protocol=Plaintext)
parallel.run()

# Implementation of entire TEFB suite in pure Python.
#===============================================================================
# Imports
#===============================================================================
import sys
import json
import pyodbc

import parallel

from ctk.util import (
    try_int,
    Dict,
)

from parallel import (
    rdtsc,
    sys_stats,
    socket_stats,
    memory_stats,
    context_stats,
    enable_heap_override,
    disable_heap_override,
    call_from_main_thread,
    call_from_main_thread_and_wait,
)

from parallel.http.server import (
    quote_html,
    router,
    make_routes,
    text_response,
    html_response,
    json_serialization,

    Request,
    HttpServer,
)

# It can be a pain setting up the environment to load the debug version of
# numpy, so, if we can't import it, just default to normal Python random.
np = None
if not sys.executable.endswith('_d.exe'):
    try:
        import numpy as np
        randint = np.random.randint
        randints = lambda size: randint(0, high=10000, size=size)
        randints2d = lambda size: randint(0, high=10000, size=(2, size))
        print("Using NumPy random facilities.")
    except ImportError:
        pass

if not np:
    import random
    randint = random.randint
    randints = lambda size: [ randint(1, 10000) for _ in range(0, size) ]
    randints2d = lambda size: [
        (x, y) for (x, y) in zip(randints(size), randints(size))
    ]
    print("Couldn't load NumPy; reverting to normal CPython random facilities.")

#===============================================================================
# Aliases
#===============================================================================

#===============================================================================
# Globals/Templates
#===============================================================================
localhost_connect_string = (
    'Driver={SQL Server};'
    'Server=localhost;'
    'Database=hello_world;'
    'Uid=benchmarkdbuser;'
    'Pwd=B3nchmarkDBPass;'
)

#===============================================================================
# Helpers
#===============================================================================

#===============================================================================
# Classes
#===============================================================================
class Fortune:
    _bytes = [
        (b'fortune: No such file or directory',                                1),
        (b"A computer scientist is someone who "
         b"fixes things that aren't broken.",                                  2),
        (b'After enough decimal places, nobody gives a damn.',                 3),
        (b'A bad random number generator: 1, 1, 1, 1, '
         b'1, 4.33e+67, 1, 1, 1',                                              4),
        (b'A computer program does what you tell it to do, '
         b'not what you want it to do.',                                       5),
        (b'Emacs is a nice operating system, but I prefer UNIX. '
         b'\xe2\x80\x94 Tom Christaensen',                                     6),
        (b'Any program that runs right is obsolete.',                          7),
        (b'A list is only as strong as its weakest link. '
         b'\xe2\x80\x94 Donald Knuth',                                         8),
        (b'Feature: A bug with seniority.',                                    9),
        (b'Computers make very fast, very accurate mistakes.',                10),
        (b'<script>alert("This should not be '
         b'displayed in a browser alert box.");</script>',                    11),
        (b'\xe3\x83\x95\xe3\x83\xac\xe3\x83\xbc\xe3\x83\xa0'
         b'\xe3\x83\xaf\xe3\x83\xbc\xe3\x82\xaf\xe3\x81\xae'
         b'\xe3\x83\x99\xe3\x83\xb3\xe3\x83\x81\xe3\x83\x9e'
         b'\xe3\x83\xbc\xe3\x82\xaf',                                         12),
    ]

    fortunes = [ (r[0].decode('utf-8'), r[1]) for r in _bytes ]

    header = (
      '<!DOCTYPE html>'
      '<html>'
        '<head>'
          '<title>Fortunes</title>'
        '</head>'
        '<body>'
          '<table>'
            '<tr>'
              '<th>id</th>'
              '<th>message</th>'
            '</tr>'
    )
    row = '<tr><td>%d</td><td>%s</td></tr>'
    footer = (
          '</table>'
        '</body>'
      '</html>'
    )

    sql = 'select message, id from fortune'

    @classmethod
    def prepare_fortunes(cls, fortunes):
        fortunes = [ (f[0], f[1]) for f in fortunes ]
        fortunes.append(('Begin.  The rest is easy.', 0))
        fortunes.sort()
        return fortunes

    @classmethod
    def render(cls, fortunes):
        fortunes = cls.prepare_fortunes(fortunes)
        row = cls.row
        return ''.join((
            cls.header,
            ''.join([ row % (f[1], quote_html(f[0])) for f in fortunes ]),
            cls.footer,
        ))

    @classmethod
    def render_raw(cls):
        return cls.render(cls.fortunes)

    @classmethod
    def render_db(cls, connect_string=None):
        fortunes = cls.load_from_db(connect_string)
        return cls.render(fortunes)

    @classmethod
    def load_from_db(cls, connect_string=None):
        cs = connect_string or localhost_connect_string
        con = pyodbc.connect(cs)
        cur = con.cursor()
        cur.execute(cls.sql)
        return cur.fetchall()

    @classmethod
    def json_from_db(cls, connect_string=None):
        results = Fortune.load_from_db(connect_string)
        results = Fortune.prepare_fortunes(results)
        fortunes = { r[1]: r[0] for r in results }
        return json.dumps(fortunes)

routes = make_routes()
route = router(routes)

class TefbHttpServer(HttpServer):
    routes = routes

    db_sql = 'select id, randomNumber from world where id = ?'
    update_sql = 'update world set randomNumber = ? where id = ?'
    fortune_sql = 'select * from fortune'

    @route
    def json(self, request):
        json_serialization(request, obj={'message': 'Hello, World'})

    @route
    def plaintext(self, request):
        text_response(request, text='Hello, World!')

    @route
    def fortunes(self, request):
        return html_response(request, Fortune.render_db(self.connect_string))

    @route
    def fortunes_raw(self, request):
        return html_response(request, Fortune.render_raw())

    @route
    def db(self, request):
        con = pyodbc.connect(self.connect_string)
        cur = con.cursor()
        cur.execute(self.db_sql, (randint(1, 10000)))
        results = cur.fetchall()
        db = {
            'id': results[0][0],
            'randomNumber': results[0][1],
        }
        return json_serialization(request, db)

    @route
    def queries(self, request):
        count = try_int(request.query.get('queries')) or 1
        if count < 1:
            count = 1
        elif count > 500:
            count = 500

        con = pyodbc.connect(self.connect_string)
        cur = con.cursor()
        ids = randints(count)
        results = []
        for npi in ids:
            i = int(npi)
            cur.execute(self.db_sql, i)
            r = cur.fetchall()
            results.append({'id': r[0][0], 'randomNumber': r[0][1]})
        return json_serialization(request, results)

    @route
    def updates(self, request):
        count = try_int(request.query.get('queries')) or 1
        if count < 1:
            count = 1
        elif count > 500:
            count = 500

        con = pyodbc.connect(self.connect_string)
        cur = con.cursor()
        ints2d = randints2d(count)
        results = []
        updates = []
        for npi in ints2d:
            (i, rn) = (int(npi[0]), int(npi[1]))
            cur.execute(self.db_sql, i)
            o = cur.fetchall()
            results.append((rn, i))
            updates.append({'id': i, 'randomNumber': rn})

        cur.executemany(self.update_sql, results)
        cur.commit()
        return json_serialization(request, updates)

    @route
    def stats(self, request):
        stats = {
            'system': dict(sys_stats()),
            'server': dict(socket_stats(request.transport.parent)),
            'memory': dict(memory_stats()),
            'contexts': dict(context_stats()),
            'elapsed': request.transport.elapsed(),
            'thread': parallel.thread_seq_id(),
        }
        json_serialization(request, stats)

    @route
    def hello(self, request, *args, **kwds):
        j = { 'args': args, 'kwds': kwds }
        return json_serialization(request, j)

    @route
    def shutdown(self, request):
        request.transport.shutdown_server()
        json_serialization(request, obj={'message': 'Shutdown'})

    @route
    def elapsed(self, request):
        obj = { 'elapsed': request.transport.elapsed() }
        json_serialization(request, obj)

plaintext_http11_response = (
    'HTTP/1.1 200 OK\r\n'
    'Server: PyParallel Web Server v0.1\r\n'
    'Date: Sat, 16 May 2015 15:21:34 GMT\r\n'
    'Content-Type: text/plain;charset=utf-8\r\n'
    'Content-Length: 15\r\n'
    '\r\n'
    'Hello, World!\r\n'
)
class CheatingPlaintextHttpServer:
    initial_bytes_to_send = plaintext_http11_response
    next_bytes_to_send = plaintext_http11_response

# vim:set ts=8 sw=4 sts=4 tw=80 et:

"""
A very simple wiki search HTTP server that demonstrates useful techniques
afforded by PyParallel: the ability to load large reference data structures
into memory, and then query them as part of incoming request processing in
parallel.
"""
#===============================================================================
# Imports
#===============================================================================
import json
import parallel
import socket
import datrie
import string
import urllib
import numpy as np

from collections import (
    defaultdict,
)

from numpy import (
    uint64,
)

from parallel import (
    rdtsc,

    sys_stats,
    socket_stats,
    memory_stats,
    context_stats,

    call_from_main_thread,
    call_from_main_thread_and_wait,

    CachingBehavior,
)

from parallel.http.server import (
    router,
    make_routes,

    Request,
    HttpServer,
    RangedRequest,
)

from os.path import (
    join,
    exists,
    abspath,
    dirname,
    normpath,
)

def join_path(*args):
    return abspath(normpath(join(*args)))

#===============================================================================
# Configurables -- Change These!
#===============================================================================
# Change this to the directory containing the downloaded files.
#DATA_DIR = r'd:\data'
DATA_DIR = join_path(dirname(__file__), 'data')

# If you want to change the hostname listened on from the default (which will
# resolve to whatever IP address the computer name resolves to), do so here.
HOSTNAME = socket.gethostname()
# E.g.:
# HOSTNAME = 'localhost'
IPADDR = '0.0.0.0'
PORT = 8080

#===============================================================================
# Constants
#===============================================================================

# This file is huge when unzipped -- ~53GB.  Although, granted, it is the
# entire Wikipedia in a single file.  The bz2 version is much smaller, but
# still pretty huge.  Search the web for instructions on how to download
# from one of the wiki mirrors, then bunzip2 it, then place in the same
# data directory.
WIKI_XML_NAME = 'enwiki-20150205-pages-articles.xml'
WIKI_XML_PATH = join_path(DATA_DIR, WIKI_XML_NAME)

# The following two files can be downloaded from
# http://download.pyparallel.org.

# This is a trie keyed by every <title>xxx</title> in the wiki XML; the value
# is a 64-bit byte offset within the file where the title was found.
# Specifically, it is the offset where the '<' bit of the <title> was found.
TITLES_TRIE_PATH = join_path(DATA_DIR, 'titles.trie')
# And because this file is so huge and the data structure takes so long to
# load, we have another version that was created for titles starting with Z
# and z (I picked 'z' as I figured it would have the least-ish titles).  (This
# file was created via the save_titles_startingwith_z() method below.)
ZTITLES_TRIE_PATH = join_path(DATA_DIR, 'ztitles.trie')

# This is a sorted numpy array of uint64s representing the byte offset values
# in the trie.  When given the byte offset of a title derived from a trie
# lookup, we can find the byte offset of where the next title starts within
# the xml file.  That allows us to isolate the required byte range from the
# xml file where the particular title is defined.  Such a byte range can be
# satisfied with a ranged HTTP request.
TITLES_OFFSETS_NPY_PATH = join_path(DATA_DIR, 'titles_offsets.npy')

#===============================================================================
# Aliases
#===============================================================================
uint64_7 = uint64(7)
uint64_11 = uint64(11)
#===============================================================================
# Globals
#===============================================================================
#wiki_xml = parallel.open(WIKI_XML_PATH, 'r', caching=CachingBehavior.RandomAccess)

offsets = np.load(TITLES_OFFSETS_NPY_PATH)

# Use the smaller one if the larger one doesn't exist.
if not exists(TITLES_TRIE_PATH):
    TRIE_PATH = ZTITLES_TRIE_PATH
else:
    TRIE_PATH = TITLES_TRIE_PATH

print("About to load titles trie, this will take a while...")
titles = datrie.Trie.load(TRIE_PATH)

#===============================================================================
# Misc Helpers
#===============================================================================
def save_titles_startingwith_z():
    # Ok, the 11GB trie that takes 2 minutes to load is painful to develop
    # with; let's whip up a little helper that just works on titles starting
    # with 'Z'.
    path = TITLES_TRIE_PATH.replace('titles.', 'ztitles.')
    allowed = (string.printable + string.punctuation)
    ztrie = datrie.Trie(allowed)
    for c in ('Z', 'z'):
        for (key, value) in titles.items(c):
            if key in ztrie:
                existing = ztrie[key]
                for v in value:
                    if v not in existing:
                        existing.append(v)
                        existing.sort()
            else:
                ztrie[key] = value
    ztrie.save(path)

def json_serialization(request=None, obj=None):
    """
    Helper method for converting a dict `obj` into a JSON response for the
    incoming `request`.
    """
    transport = None
    if not request:
        request = Request(transport=None, data=None)
    else:
        transport = request.transport
    if not obj:
        obj = {}
    #parallel.debug('obj: %r' % obj)
    response = request.response
    response.code = 200
    response.message = 'OK'
    response.content_type = 'application/json; charset=UTF-8'
    response.body = json.dumps(obj)

    return request

def text_serialization(request=None, text=None):
    transport = None
    if not request:
        request = Request(transport=None, data=None)
    else:
        transport = request.transport
    if not text:
        text = 'Hello, World!'
    response = request.response
    response.code = 200
    response.message = 'OK'
    response.content_type = 'text/plain; charset=UTF-8'
    response.body = text

    return request


#===============================================================================
# Offset Helpers
#===============================================================================

# Three implementations of the same functionality: given a key, look up all
# items in the trie starting with that key, then return the relevant offsets
# for each one, such that a client can then issue a ranged HTTP request for
# the bytes returned.
#   >>> results = titles.items('Ap')
#   >>> len(results)
#   16333
#
# So, how long does it take to construct the result set for 16,333 hits?
# (That is, 16,333 Wikipedia pages whose page title starts with 'Ap'.)
#
#   >>> from ctk.util import timer
#   >>> with timer.timeit():
#   ...     _ = dumps(get_page_offsets_for_key2('Ap'))
#   ...
#   274ms

#   >>> with timer.timeit():
#   ...     _ = dumps(get_page_offsets_for_key('Ap'))
#   ...
#   278ms

#   >>> with timer.timeit():
#   ...     _ = dumps(get_page_offsets_for_key3('Ap'))
#   ...
#   256ms

def get_page_offsets_for_key(search_string):
    items = titles.items(search_string)
    results = defaultdict(list)
    for (key, value) in items:
        for v in value:
            o = uint64(v if v > 0 else v*-1)
            ix = offsets.searchsorted(o, side='right')
            results[key].append((int(o-uint64_7), int(offsets[ix]-uint64_11)))
    return results or None

def get_page_offsets_for_key2(search_string):
    items = titles.items(search_string)
    if not items:
        return None
    results = [ [None, None, None] for _ in range(0, len(items)) ]
    assert len(results) == len(items), (len(results), len(items))
    for (i, pair) in enumerate(items):
        (key, value) = pair
        for (j, v) in enumerate(value):
            rx = i + j
            o = uint64(v if v > 0 else v*-1)
            ix = offsets.searchsorted(o, side='right')
            results[rx][0] = key
            results[rx][1] = int(o-uint64_7)
            results[rx][2] = int(offsets[ix]-uint64_11)
    return results

def get_page_offsets_for_key3(search_string):
    results = []
    items = titles.items(search_string)
    if not items:
        return results
    for (key, value) in items:
        v = value[0]
        o = uint64(v if v > 0 else v*-1)
        ix = offsets.searchsorted(o, side='right')
        results.append((key, int(o-uint64_7), int(offsets[ix]-uint64_11)))
    return results

#===============================================================================
# Web Helpers
#===============================================================================
def exact_title(title):
    if title in titles:
        return json.dumps([[title, ] + [ t for t in titles[title] ]])
    else:
        return json.dumps([])

#===============================================================================
# Classes
#===============================================================================
routes = make_routes()
route = router(routes)

class WikiServer(HttpServer):
    routes = routes

    @route
    def wiki(self, request, name, **kwds):
        # Do an exact lookup if we find a match.
        if name not in titles:
            return self.error(request, 404)

        o = titles[name][0]
        o = uint64(o if o > 0 else o*-1)
        ix = offsets.searchsorted(o, side='right')
        start = int(o-uint64_7)
        end = int(offsets[ix]-uint64_11)
        range_request = '%d-%d' % (start, end)
        request.range = RangedRequest(range_request)
        request.response.content_type = 'text/xml; charset=utf-8'
        return self.sendfile(request, WIKI_XML_PATH)

    @route
    def offsets(self, request, name, limit=None):
        if not name:
            return self.error(request, 400, "Missing name")

        if len(name) < 3:
            return self.error(request, 400, "Name too short (< 3 chars)")

        return json_serialization(request, get_page_offsets_for_key3(name))

    @route
    def xml(self, request, *args, **kwds):
        if not request.range:
            return self.error(request, 400, "Ranged-request required.")
        else:
            request.response.content_type = 'text/xml; charset=utf-8'
            return self.sendfile(request, WIKI_XML_PATH)

    @route
    def stats(self, request, *args, **kwds):
        stats = {
            'system': dict(sys_stats()),
            'server': dict(socket_stats(request.transport.parent)),
            'memory': dict(memory_stats()),
            'contexts': dict(context_stats()),
            'elapsed': request.transport.elapsed(),
            'thread': parallel.thread_seq_id(),
        }
        if args:
            name = args[0]
            if name in stats:
                stats = { name: stats[name] }
        return json_serialization(request, stats)

    @route
    def hello(self, request, *args, **kwds):
        j = { 'args': args, 'kwds': kwds }
        return json_serialization(request, j)

    @route
    def title(self, request, name, *args, **kwds):
        items = titles.items(name)
        return json_serialization(request, items)

    @route
    def elapsed(self, request, *args, **kwds):
        obj = { 'elapsed': request.transport.elapsed() }
        return json_serialization(obj)

    @route
    def json(self, request, *args, **kwds):
        return json_serialization(request, {'message': 'Hello, World!'})

    @route
    def plaintext(self, request, *args, **kwds):
        return text_serialization(request, text='Hello, World!')

#===============================================================================
# Main
#===============================================================================

def main():
    server = parallel.server(IPADDR, PORT)
    parallel.register(transport=server, protocol=WikiServer)
    parallel.run_once()
    return server

if __name__ == '__main__':
    server = main()
    parallel.run()


# vim:set ts=8 sw=4 sts=4 tw=78 et:

Download

PyParallel is alpha level quality and should not be used in production.
All interfaces and implementation details are subject to change.

Windows 7/8/10 64-bit