Content from 2016-11
I've been reading about multi-threaded programming out of curiosity over lock-free algorithms.
The first thing I've implemented for that was actually something completely different, namely a short example of memory reordering ported over from a C++ program.
In Common Lisp this relies on SBCL because of the availability of memory
barriers and the Linux-specific API to set the thread affinity to a single CPU
core (sb-cpu-affinity
). Of
course, given the appropriate compability libraries this could easily be done
on other implementations too.
(in-package #:cl-user)
(defun main (&key single-cpu-p barrierp)
(let ((begin-semaphore-1 (sb-thread:make-semaphore))
(begin-semaphore-2 (sb-thread:make-semaphore))
(end-semaphore (sb-thread:make-semaphore))
(iterations 0)
(detected 0)
(x 0)
(y 0)
(r1 0)
(r2 0)
done)
(declare (fixnum x y r1 r2))
(declare (boolean done))
(declare (optimize (speed 3) (safety 0) (space 0) (debug 0) (compilation-speed 0)))
(labels ((single-cpu ()
(when single-cpu-p
sb-cpu-affinity::
(with-cpu-affinity-mask (mask :save T)
(clear-cpu-affinity-mask mask)
(setf (cpu-affinity-p 0 mask) T))))
(thread-1 ()
(single-cpu)
(loop
(when done
(return))
(sb-thread:wait-on-semaphore begin-semaphore-1)
(setf x 1)
(when barrierp
(sb-thread:barrier (:memory)))
(setf r1 y)
(sb-thread:signal-semaphore end-semaphore)))
(thread-2 ()
(single-cpu)
(loop
(when done
(return))
(sb-thread:wait-on-semaphore begin-semaphore-2)
(setf y 1)
(when barrierp
(sb-thread:barrier (:memory)))
(setf r2 x)
(sb-thread:signal-semaphore end-semaphore))))
(let ((thread-1 (sb-thread:make-thread #'thread-1))
(thread-2 (sb-thread:make-thread #'thread-2)))
(unwind-protect
(loop
(setf x 0 y 0)
(sb-thread:signal-semaphore begin-semaphore-1)
(sb-thread:signal-semaphore begin-semaphore-2)
(sb-thread:wait-on-semaphore end-semaphore)
(sb-thread:wait-on-semaphore end-semaphore)
(when (and (eql r1 0) (eql r2 0))
(incf detected)
(format T "~D reorders detected after ~D iterations, every ~D on average~%" detected iterations (floor iterations detected)))
(incf iterations))
(setf done T)
(sb-thread:signal-semaphore begin-semaphore-1)
(sb-thread:signal-semaphore begin-semaphore-2)
(sb-thread:join-thread thread-1)
(sb-thread:join-thread thread-2))))))
On the JVM, respectively Kotlin, the code looks
again remarkably similar. I haven't yet looked at thread affinity, but what's
interesting here are two aspects related to the JVM memory model. Using
volatile
is, in contrast to C, viable, as it generates the necessary memory
barriers.
import java.util.concurrent.Semaphore
import kotlin.concurrent.thread
class HelloWorld {
var beginSempahore1 = Semaphore(0)
var beginSempahore2 = Semaphore(0)
var endSemaphore = Semaphore(0)
var detected = 0
var iterations = 0
@Volatile var x = 0
@Volatile var y = 0
var r1 = 0
var r2 = 0
fun run() {
thread {
while (true) {
beginSempahore1.acquire()
x = 1
r1 = y
endSemaphore.release()
}
}
thread {
while (true) {
beginSempahore2.acquire()
y = 1
r2 = x
endSemaphore.release()
}
}
while (true) {
iterations += 1
x = 0
y = 0
beginSempahore1.release()
beginSempahore2.release()
endSemaphore.acquire()
endSemaphore.acquire()
if (r1 == 0 && r2 == 0) {
detected++
println("$detected reorders detected after $iterations iterations")
}
}
}
}
fun main(args: Array<String>) {
HelloWorld().run()
}
Also, when using a JVM plugin, the generated assembly code can be dumped during compilation, which allows us to confirm whether those instructions actually have been generated; for the first loop:
0x00007f9ad4f503c9: mov (%rcx),%r11d ;*getfield this$0
; - HelloWorld$run$1::invoke@11 (line 21)
0x00007f9ad4f503cc: test %r11d,%r11d
0x00007f9ad4f503cf: je 0x00007f9ad4f507a9
0x00007f9ad4f503d5: movl $0x1,0x14(%r12,%r11,8)
0x00007f9ad4f503de: lock addl $0x0,(%rsp) ;*putfield x
; - HelloWorld::setX@2 (line 12)
; - HelloWorld$run$1::invoke@15 (line 21)
0x00007f9ad4f503e3: mov (%rcx),%r10d ;*getfield this$0
; - HelloWorld$run$1::invoke@19 (line 22)
0x00007f9ad4f503e6: test %r10d,%r10d
0x00007f9ad4f503e9: je 0x00007f9ad4f507cd ;*invokevirtual getY
; - HelloWorld$run$1::invoke@26 (line 22)
0x00007f9ad4f503ef: mov 0x18(%r12,%r10,8),%r11d ;*getfield y
; - HelloWorld::getY@1 (line 13)
; - HelloWorld$run$1::invoke@26 (line 22)
0x00007f9ad4f503f4: mov %r11d,0x1c(%r12,%r10,8) ;*putfield r1
; - HelloWorld::setR1@2 (line 14)
; - HelloWorld$run$1::invoke@29 (line 22)
And for the second loop:
0x00007f9ad4f47269: mov (%rcx),%r11d ;*getfield this$0
; - HelloWorld$run$2::invoke@11 (line 30)
0x00007f9ad4f4726c: test %r11d,%r11d
0x00007f9ad4f4726f: je 0x00007f9ad4f47629
0x00007f9ad4f47275: movl $0x1,0x18(%r12,%r11,8)
0x00007f9ad4f4727e: lock addl $0x0,(%rsp) ;*putfield y
; - HelloWorld::setY@2 (line 13)
; - HelloWorld$run$2::invoke@15 (line 30)
0x00007f9ad4f47283: mov (%rcx),%r10d ;*getfield this$0
; - HelloWorld$run$2::invoke@19 (line 31)
0x00007f9ad4f47286: test %r10d,%r10d
0x00007f9ad4f47289: je 0x00007f9ad4f4764d ;*invokevirtual getX
; - HelloWorld$run$2::invoke@26 (line 31)
0x00007f9ad4f4728f: mov 0x14(%r12,%r10,8),%r11d ;*getfield x
; - HelloWorld::getX@1 (line 12)
; - HelloWorld$run$2::invoke@26 (line 31)
0x00007f9ad4f47294: mov %r11d,0x20(%r12,%r10,8) ;*putfield r2
; - HelloWorld::setR2@2 (line 15)
; - HelloWorld$run$2::invoke@29 (line 31)
And the main loop:
0x00007fd383813838: lock addl $0x0,(%rsp) ;*putfield x
; - HelloWorld::run@58 (line 38)
0x00007fd38381383d: mov $0x0,%edx
0x00007fd383813842: mov %edx,0x18(%rsi)
0x00007fd383813845: lock addl $0x0,(%rsp) ;*putfield y
; - HelloWorld::run@63 (line 39)
0x00007fd38381384a: mov 0x24(%rsi),%edi
Note that in contrast to the original and the Common Lisp version there's one additional memory barrier here that's unnecessary.
How do you track everything you know? Computers and mobile phones in turn have become external memory, however data is still fragmented, stored in different formats and usable in different programs.
While there are partial solutions to this issue, none seem to be all-encompassing enough.
For example, PIMs come close for communication and contacts, but it's hard to extend and customise them. You should be able to associate custom tags and key-value pairs with "documents"/entities. This goes straight into ontologies since the question how to model many kinds of relations will be a problem to deal with.
Lastly tracking all data only makes sense if there are reasonable search facilities and if exchange with other databases can be done with security in mind, especially if even metadata is sensitive.
Occasionally the topic of asynchronous I/O on local files comes up, though while there are APIs for event-based processing they don't work on regular files, resulting in the use of worker threads to circumvent this restriction.
The inciting incident for me to look into this was the fact that one of my (external, spinning-disk) hard drives has a rather short timeout for spin-down, such that when pausing while browsing through a directory tree would often annoy me when the shell (or any other program) was completely blocked as the motor was being turned on again.
At least on Linux
(since version 2.5)
there is actually a kernel syscall interface for asynchronous file I/O.
Unfortunately it is not being exposed via the libc at all, requiring custom
wrappers in order to trigger the proper syscalls. Apart from scheduling
asynchronous read and write requests it also supports exposing the
corresponding events via an
evenfd
queue, which then allows us to use epoll
and friends.
SBCL being the ultimate breadboard that's not particularly hard though. Using CFFI and IOLib for convenience it's straightforward to port the C examples while writing a minimal amount of C code. The code is of course not very high-level, but can be plugged straight into the IOLib event loop as there's now a file descriptor available to listen on.
Grovelling & wrapping
The groveller can be used quite nicely to prevent us from having to drop down to C completely. Of course we're also using ASDF, so everything's peachy.
Now what's required? CFFI provides additional components for ASDF, namely
:CFFI-GROVEL-FILE
and :CFFI-WRAPPER-FILE
, which make the process seamless
and don't require us to write and code related to compiling and loading the C
wrapper:
;; -*- mode: lisp; syntax: common-lisp; coding: utf-8-unix; package: cl-user; -*-
(in-package #:cl-user)
(asdf:defsystem #:example
:defsystem-depends-on (#:cffi-grovel)
#+asdf-unicode :encoding #+asdf-unicode :utf-8
:depends-on (#:iterate
#:iolib
#:cffi
#:osicat)
:serial T
:components ((:module "src"
:components
((:file "package")
(:file "wrapper")
(:cffi-grovel-file "linux-aio-grovel")
(:cffi-wrapper-file "linux-aio-wrapper")
(:file "linux-aio")))))
The package definition is probably not very interesting at this point:
(in-package #:cl-user)
(defpackage #:example
(:use #:cl #:iterate #:iolib #:cffi))
I've added IOLib and CFFI, usually also ITERATE
for convenience.
Next we grovel a couple of definitions related to the kernel API for the
asynchronous requests and for eventfd
. This is the linux-aio-grovel
file
mentioned above:
(in-package #:example)
(include "stdio.h" "unistd.h" "sys/syscall.h" "linux/aio_abi.h" "inttypes.h"
"signal.h" "sys/eventfd.h")
(ctype aio-context-t "aio_context_t")
(cenum iocb-cmd-t
((:pread "IOCB_CMD_PREAD"))
((:pwrite "IOCB_CMD_PWRITE"))
((:fsync "IOCB_CMD_FSYNC"))
((:fdsync "IOCB_CMD_FDSYNC"))
((:noop "IOCB_CMD_NOOP"))
((:preadv "IOCB_CMD_PREADV"))
((:pwritev "IOCB_CMD_PWRITEV")))
(constantenum iocb-flags-t
((:resfd "IOCB_FLAG_RESFD")))
(cstruct iocb "struct iocb"
(aio-data "aio_data" :type :uint64)
;; #-little-endian
;; (aio-reserved1 "aio_reserved1" :type :uint32)
(aio-key "aio_key" :type :uint32)
;; #+little-endian
;; (aio-reserved1 "aio_reserved1" :type :uint32)
(aio-lio-opcode "aio_lio_opcode" :type iocb-cmd-t)
(aio-fildes "aio_fildes" :type :uint32)
(aio-buf "aio_buf" :type :uint64)
(aio-nbytes "aio_nbytes" :type :uint64)
(aio-offset "aio_offset" :type :int64)
;; (aio-reserved2 "aio_reserved2" :type :uint64)
(aio-flags "aio_flags" :type :uint32)
(aio-resfd "aio_resfd" :type :uint32))
(cstruct io-event "struct io_event"
(data "data" :type :uint64)
(obj "obj" :type :uint64)
(res "res" :type :int64)
(res2 "res" :type :int64))
(cenum eventfd-flags-t
((:cloexec "EFD_CLOEXEC"))
((:nonblock "EFD_NONBLOCK"))
((:semaphore "EFD_SEMAPHORE")))
Note that this not a complete list and a couple of reserved members are commented out as they're primarily used to provide space for further expansion. Fortunately offsets for the rest of the struct aren't affected by leaving out parts in the Lisp-side definition.
The enums are easy enough, even though they both represent flags, so should be or-ed together, which might be necessary to do manually or by finding a way to let CFFI do the coercion from a list of flags perhaps.
In order to have nice syscall wrappers we'd normally use defsyscall
from
IOLib. Unfortunately we also want to use defwrapper
from CFFI-GROVEL. This
is an example of bad composability of macros, requiring copy and paste of
source code. Of course with enough refactoring or an optional parameter this
could be circumvented. This is the wrapper
file from the ASDF definition.
;; groan
cffi-grovel::
(define-wrapper-syntax defwrapper/syscall* (name-and-options rettype args &rest c-lines)
;; output C code
(multiple-value-bind (lisp-name foreign-name options)
(cffi::parse-name-and-options name-and-options)
(let ((foreign-name-wrap (strcat foreign-name "_cffi_wrap"))
(fargs (mapcar (lambda (arg)
(list (c-type-name (second arg))
(cffi::foreign-name (first arg) nil)))
args)))
(format out "~A ~A" (c-type-name rettype)
foreign-name-wrap)
(format out "(~{~{~A ~A~}~^, ~})~%" fargs)
(format out "{~%~{ ~A~%~}}~%~%" c-lines)
;; matching bindings
(push `(iolib/syscalls:defsyscall (,foreign-name-wrap ,lisp-name ,@options)
,(cffi-type rettype)
,@(mapcar (lambda (arg)
(list (symbol* (first arg))
(cffi-type (second arg))))
args))
*lisp-forms*))))
The only change from DEFWRAPPER
is the use of IOLIB/SYSCALLS:DEFSYSCALL
instead of DEFCFUN
, which then performs additional checks with respect to the
return value, raising a IOLIB/SYSCALLS:SYSCALL-ERROR
that we can then catch
rather than having to check the return value ourselves.
Lastly, the actual wrappers. Note that some inline C is used to define the
function bodies. This is linux-aio-wrapper
from the ASDF definition:
(define "_GNU_SOURCE")
(include "stdio.h" "unistd.h" "sys/syscall.h" "linux/aio_abi.h" "inttypes.h"
"signal.h")
(defwrapper/syscall* "io_setup" :int
((nr :unsigned-int)
(ctxp ("aio_context_t*" (:pointer aio-context-t))))
"return syscall(__NR_io_setup, nr, ctxp);")
(defwrapper/syscall* "io_destroy" :int
((ctx aio-context-t))
"return syscall(__NR_io_destroy, ctx);")
(defwrapper/syscall* "io_submit" :int
((ctx aio-context-t)
(nr :long)
(iocbpp ("struct iocb**" (:pointer (:pointer (:struct iocb))))))
"return syscall(__NR_io_submit, ctx, nr, iocbpp);")
(defwrapper/syscall* "io_cancel" :int
((ctx aio-context-t)
(iocbp ("struct iocb*" (:pointer (:struct iocb))))
(result ("struct io_event*" (:pointer (:struct io-event)))))
"return syscall(__NR_io_cancel, ctx, iocbp, result);")
(defwrapper/syscall* "io_getevents" :int
((ctx aio-context-t)
(min-nr :long)
(max-nr :long)
(events ("struct io_event*" (:pointer (:struct io-event))))
(timeout ("struct timespec*" (:pointer (:struct iolib/syscalls::timespec)))))
"return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);")
Looping
Now that we have all definitions in place, let's translate a moderately complex example of reading from an existing file.
Also EVENTFD
is defined here as the C function is already defined in the libc
and doesn't have to generated.
(iolib/syscalls:defsyscall eventfd :int
(initval :unsigned-int)
(flags eventfd-flags-t))
(defun linux-aio-test (pathname &key (chunk-size 4096))
(with-foreign-object (context 'aio-context-t)
(iolib/syscalls:memset context 0 (foreign-type-size 'aio-context-t))
(let ((eventfd (eventfd 0 :nonblock)))
(unwind-protect
(with-open-file (stream pathname :element-type '(unsigned-byte 8))
(let* ((length (file-length stream))
(chunks (ceiling length chunk-size)))
(with-foreign-object (buffer :uint8 length)
(with-event-base (event-base)
(io-setup 1 context) ; set up with number of possible operations
(with-foreign-object (iocbs '(:struct iocb) chunks)
(iolib/syscalls:memset iocbs 0 (* (foreign-type-size '(:struct iocb)) chunks))
;; set up array of operations
(dotimes (i chunks)
(let ((iocb (mem-aptr iocbs '(:struct iocb) i)))
(setf (foreign-slot-value iocb '(:struct iocb) 'aio-lio-opcode) :pread)
(setf (foreign-slot-value iocb '(:struct iocb) 'aio-buf) (pointer-address (mem-aptr buffer :uint8 (* i chunk-size))))
(setf (foreign-slot-value iocb '(:struct iocb) 'aio-nbytes) (if (eql i (1- chunks)) (- length (* i chunk-size)) chunk-size))
(setf (foreign-slot-value iocb '(:struct iocb) 'aio-fildes) (sb-sys:fd-stream-fd stream))
(setf (foreign-slot-value iocb '(:struct iocb) 'aio-offset) (* i chunk-size))
(setf (foreign-slot-value iocb '(:struct iocb) 'aio-flags) (foreign-enum-value 'iocb-flags-t :resfd))
(setf (foreign-slot-value iocb '(:struct iocb) 'aio-resfd) eventfd)))
;; set up array of pointers to operations
(with-foreign-object (iocbp '(:pointer (:struct iocb)) chunks)
(dotimes (i chunks)
(setf (mem-aref iocbp '(:pointer (:struct iocb)) i) (mem-aptr iocbs '(:struct iocb) i)))
;; submit as many operations as possible
(let ((submitted (io-submit (mem-ref context 'aio-context-t) chunks iocbp)))
;; keep track of how many operations completed total
(let ((total-events-read 0))
(flet ((get-events () ; named to be able to RETURN-FROM
(with-foreign-object (events '(:struct io-event) 3)
(loop
(handler-case
(with-foreign-object (available-buffer :uint64)
(iolib/syscalls:read eventfd available-buffer 8)
(let ((available (mem-ref available-buffer :uint64)))
(dotimes (i available)
(let ((events-read (io-getevents (mem-ref context 'aio-context-t) 0 3 events (null-pointer))))
(when (eql events-read 0)
(return))
(incf total-events-read events-read))
(when (eql total-events-read chunks)
(return-from linux-aio-test)))))
;; in case reading would block
(iolib/syscalls:syscall-error ()
(when (< submitted chunks)
(let ((more-submitted (io-submit (mem-ref context 'aio-context-t) chunks (mem-aptr iocbp '(:pointer (:struct iocb)) submitted))))
(incf submitted more-submitted)))
(return-from get-events)))))))
(set-io-handler
event-base eventfd :read
(lambda (fd event exception)
(declare (ignore fd event exception))
(get-events)))
(event-dispatch event-base))))))))))
(io-destroy (mem-ref context 'aio-context-t))
(iolib/syscalls:close eventfd)))))
Relatively straightforward. Complexity comes from accurately submitting chunks, reading the number of available events on demand and submitting a new batch of chunks as long as there are some remaining ones.
Insert FORMAT
statement as you like. Tuning the values would need to be
considered in order to keep memory consumption in check. Finally
Outlook
We still can't do many local file operations asynchronously. The whole reason to jump through these hoops is of course to integrate potentially blocking operations into an event loop, so some care still needs to be taken to do some work ahead of time or in separate threads as to not block the main part of the program from issuing the I/O requests.