bsv

bsv

maximum performance data processing

github.com/nathants/bsv

why

it should be simple and easy to process data at the speed of sequential io.

what

a simple and efficient data format for easily manipulating chunks of rows of columns while minimizing allocations and copies.

minimal cli tools for rapidly composing performant data flow pipelines.

how

column: 0-65536 bytes.

row: 0-65536 columns.

chunk: up to 5MB containing 1 or more complete rows.

note: row data cannot exceed chunk size.

layout

chunk:

| i32:size | u8[]:row | ... |

row:

| u16:max | u16:size | ... | u8[]:column | ... |

note: column bytes are always followed by a single null byte.

note: max is the maximum zero based index into the row.

install

>> curl https://raw.githubusercontent.com/nathants/bsv/master/scripts/install_archlinux.sh | bash
>> git clone https://github.com/nathants/bsv
>> cd bsv
>> make -j
>> sudo mv -fv bin/* /usr/local/bin

note: for best pipeline performance increase maximum pipe size

>> sudo sysctl fs.pipe-max-size=5242880

test

>> tox
>> docker build -t bsv:debian -f Dockerfile.debian .

>> docker run -v $(pwd):/code --rm -it bsv:debian bash -c 'cd /code && py.test -vvx --tb native -n auto test/'
>> docker build -t bsv:alpine -f Dockerfile.alpine .

>> docker run -v $(pwd):/code --rm -it bsv:alpine bash -c 'cd /code && py.test -vvx --tb native -n auto test/'

increase the number of generated tests cases with environment variable: TEST_FACTOR=5

example

add bsumall.c to bsv/src/:

#include "util.h"
#include "load.h"
#include "dump.h"

#define DESCRIPTION "sum columns of u16 as i64\n\n"
#define USAGE "... | bsumall \n\n"
#define EXAMPLE ">> echo '\n1,2\n3,4\n' | bsv | bschema a:u16,a:u16 | bsumall i64 | bschema i64:a,i64:a | csv\n4,6\n"

int main(int argc, char **argv) {

    // setup state
    SETUP();
    readbuf_t rbuf = rbuf_init((FILE*[]){stdin}, 1, false);
    writebuf_t wbuf = wbuf_init((FILE*[]){stdout}, 1, false);
    i64 sums[MAX_COLUMNS] = {0};
    row_t row;

    // process input row by row
    while (1) {
        load_next(&rbuf, &row, 0);
        if (row.stop)
            break;
        for (i32 i = 0; i <= row.max; i++) {
            ASSERT(sizeof(u16) == row.sizes[i], "fatal: bad data\n");
            sums[i] += *(u16*)row.columns[i];
        }
    }

    // generate output row
    row.max = -1;
    for (i32 i = 0; i < MAX_COLUMNS; i++) {
        if (!sums[i])
            break;
        row.sizes[i] = sizeof(i64);
        row.columns[i] = &sums[i];
        row.max++;
    }

    // dump output
    if (row.max >= 0)
        dump(&wbuf, &row, 0);
    dump_flush(&wbuf, 0);
}

build and run:

>> ./scripts/makefile.sh

>> make bsumall

>> bsumall -h
sum columns of u16 as i64

usage: ... | bsumall

>> echo '
1,2
3,4
' | bsv | bschema a:u16,a:u16 | bsumall i64 | bschema i64:a,i64:a | csv
4,6

non goals

support of hardware other than little endian.

types and schemas as a part of the data format.

testing methodology

quickcheck style testing with python implementations to verify correct behavior for arbitrary inputs and varying buffer sizes.

experiments

performance experiments and alternate implementations.

related projects

s4 - a storage cluster that is cheap and fast, with data local compute and efficient shuffle.

related posts

optimizing a bsv data processing pipeline

performant batch processing with bsv, s4, and presto

discovering a baseline for data processing performance

refactoring common distributed data patterns into s4

scaling python data processing horizontally

scaling python data processing vertically

more examples

structured analysis of nyc taxi data with bsv and hive

tools

name description
bcat cat some bsv files to csv
bcombine prepend a new column by combining values from existing columns
bcounteach count as i64 each contiguous identical row by the first column
bcounteach-hash count as i64 by hash of the first column
bcountrows count rows as i64
bcut select some columns
bdedupe dedupe identical contiguous rows by the first column, keeping the first
bdedupe-hash dedupe rows by hash of the first column, keeping the first
bdropuntil for sorted input, drop until the first column is gte to VALUE
bhead keep the first n rows
blz4 compress bsv data
blz4d decompress bsv data
bmerge merge sorted files from stdin
bpartition split into multiple files by consistent hash of the first column value
bquantile-merge merge ddsketches and output quantile value pairs as f64
bquantile-sketch collapse the first column into a single row ddsketch
bschema validate and converts row data with a schema of columns
bsort timsort rows by the first column
bsplit split a stream into multiple files
bsum sum the first column
bsumeach sum the second column of each contiguous identical row by the first column
bsumeach-hash sum as i64 the second column by hash of the first column
bsv convert csv to bsv
btake take while the first column is VALUE
btakeuntil for sorted input, take until the first column is gte to VALUE
btopn accumulate the top n rows in a heap by first column value
bunzip split a multi column input into single column outputs
bzip combine single column inputs into a multi column output
csv convert bsv to csv
xxh3 xxh3_64 hash stdin

bcat

cat some bsv files to csv

usage: bcat [-l|--lz4] [-p|--prefix] [-h N|--head N] FILE1 ... FILEN
>> for char in a a b b c c; do
     echo $char | bsv >> /tmp/$char
   done

>> bcat --head 1 --prefix /tmp/{a,b,c}
/tmp/a:a
/tmp/b:b
/tmp/c:c

bcombine

prepend a new column by combining values from existing columns

usage: ... | bcombine COL1,...,COLN
>> echo a,b,c | bsv | bcombine 3,2 | csv
b:a,a,b,c

bcounteach

count as i64 each contiguous identical row by the first column

usage: ... | bcounteach
echo '
a
a
b
b
b
a
' | bsv | bcounteach | bschema *,i64:a | csv
a,2
b,3
a,1

bcounteach-hash

count as i64 by hash of the first column

usage: ... | bcounteach-hash
echo '
a
a
b
b
b
a
' | bsv | bcounteach-hash | bschema *,i64:a | bsort | csv
a,3
b,3

bcountrows

count rows as i64

usage: ... | bcountrows
>> echo '
1
2
3
4
' | bsv | bcountrows | csv
4

bcut

select some columns

usage: ... | bcut COL1,...,COLN
>> echo a,b,c | bsv | bcut 3,3,3,2,2,1 | csv
c,c,c,b,b,a

bdedupe

dedupe identical contiguous rows by the first column, keeping the first

usage: ... | bdedupe
>> echo '
a
a
b
b
a
a
' | bsv | bdedupe | csv
a
b
a

bdedupe-hash

dedupe rows by hash of the first column, keeping the first

usage: ... | bdedupe-hash
>> echo '
a
a
b
b
a
a
' | bsv | bdedupe-hash | csv
a
b

bdropuntil

for sorted input, drop until the first column is gte to VALUE

usage: ... | bdropuntil VALUE [TYPE]
>> echo '
a
b
c
d
' | bsv | bdropuntil c | csv
c
d

bhead

keep the first n rows

usage: ... | bhead N
>> echo '
a
b
c
' | bsv | btail 2 | csv
a
b

blz4

compress bsv data

usage: ... | blz4
>> echo a,b,c | bsv | blz4 | blz4d | csv
a,b,c

blz4d

decompress bsv data

usage: ... | blz4d
>> echo a,b,c | bsv | blz4 | blz4d | csv
a,b,c

bmerge

merge sorted files from stdin

usage: echo FILE1 ... FILEN | bmerge [TYPE] [-r|--reversed] [-l|--lz4]
>> echo -e 'a
c
e
' | bsv > a.bsv
>> echo -e 'b
d
f
' | bsv > b.bsv
>> echo a.bsv b.bsv | bmerge
a
b
c
d
e
f

bpartition

split into multiple files by consistent hash of the first column value

usage: ... | bpartition NUM_BUCKETS [PREFIX] [-l|--lz4]
>> echo '
a
b
c
' | bsv | bpartition 10 prefix
prefix03
prefix06

bquantile-merge

merge ddsketches and output quantile value pairs as f64

usage: ... | bquantile-merge QUANTILES
>> seq 1 100 | bsv | bschema a:i64 | bquantile-sketch i64 | bquantile-merge .2,.5,.7 | bschema f64:a,f64:a | csv
0.2,19.88667024086646
0.5,49.90296094906742
0.7,70.11183939140405

bquantile-sketch

collapse the first column into a single row ddsketch

usage: ... | bquantile-sketch TYPE [-a|--alpha] [-b|--max-bins] [-m|--min-value]
>> seq 1 100 | bsv | bschema a:i64 | bquantile-sketch i64 | bquantile-merge .2,.5,.7 | bschema f64:a,f64:a | csv
0.2,19.88667024086646
0.5,49.90296094906742
0.7,70.11183939140405

bschema

validate and converts row data with a schema of columns

usage: ... | bschema SCHEMA [--filter]
  --filter remove bad rows instead of erroring

  example schemas:
    *,*,*             = 3 columns of any size
    8,*               = a column with 8 bytes followed by a column of any size
    8,*,...           = same as above, but ignore any trailing columns
    a:u16,a:i32,a:f64 = convert ascii to numerics
    u16:a,i32:a,f64:a = convert numerics to ascii
    4*,*4             = keep the first 4 bytes of column 1 and the last 4 of column 2

>> echo aa,bbb,cccc | bsv | bschema 2,3,4 | csv
aa,bbb,cccc

bsort

timsort rows by the first column

usage: ... | bsort [-r|--reversed] [TYPE]
>> echo '
3
2
1
' | bsv | bschema a:i64 | bsort i64 | bschema i64:a | csv
1
2
3

bsplit

split a stream into multiple files

usage: ... | bsplit PREFIX [chunks_per_file=1]
>> echo -n a,b,c | bsv | bsplit prefix
prefix_0000000000

bsum

sum the first column

usage: ... | bsum TYPE
>> echo '
1
2
3
4
' | bsv | bschema a:i64 | bsum i64 | bschema i64:a | csv
10

bsumeach

sum the second column of each contiguous identical row by the first column

usage: ... | bsumeach TYPE
echo '
a,1
a,2
b,3
b,4
b,5
a,6
' | bsv | bschema *,a:i64 | bsumeach i64 | bschema *,i64:a | csv
a,3
b,12
a,6

bsumeach-hash

sum as i64 the second column by hash of the first column

usage: ... | bsumeach-hash i64
echo '
a,1
a,2
b,3
b,4
b,5
a,6
' | bsv | bschema *,a:i64 | bsumeach-hash i64 | bschema *,i64:a | csv
a,3
b,12
a,6

bsv

convert csv to bsv

usage: ... | bsv
>> echo a,b,c | bsv | bcut 3,2,1 | csv
c,b,a

btake

take while the first column is VALUE

usage: ... | btake VALUE
>> echo '
a
b
c
d
' | bsv | bdropntil c | btake c | csv
c

btakeuntil

for sorted input, take until the first column is gte to VALUE

usage: ... | btakeuntil VALUE [TYPE]
>> echo '
a
b
c
d
' | bsv | btakeuntil c | csv
a
b

btopn

accumulate the top n rows in a heap by first column value

usage: ... | btopn N [TYPE] [-r|--reversed]
>> echo '
1
3
2
' | bsv | bschema a:i64 | btopn 2 i64 | bschema i64:a | csv
3
2

bunzip

split a multi column input into single column outputs

usage: ... | bunzip PREFIX [-l|--lz4]
>> echo '
a,b,c
1,2,3
' | bsv | bunzip col && echo col_1 col_3 | bzip | csv
a,c
1,3

bzip

combine single column inputs into a multi column output

usage: ls column_* | bzip [COL1,...COLN] [-l|--lz4]
>> echo '
a,b,c
1,2,3
' | bsv | bunzip column && ls column_* | bzip 1,3 | csv
a,c
1,3

csv

convert bsv to csv

usage: ... | csv
>> echo a,b,c | bsv | csv
a,b,c

xxh3

xxh3_64 hash stdin

usage: ... | xxh3 [--stream|--int]
  --stream pass stdin through to stdout with hash on stderr

  --int output hash as int not hash

>> echo abc | xxh3
079364cbfdf9f4cb