PG: Postgres-related libraries for Clojure
The PG project holds a set of packages related to PostgreSQL. This is a
breakdown of my unsuccessful attempt to write a PostgreSQL client in pure
Clojure (stored in the _
directory). Although I didn’t achieve the goal, some
parts of the code are now shipped as separated packages and might be useful for
someone.
At the moment, the most interesting modules are pg-copy
and pg-copy-jdbc
that COPY
the data using the binary Postgres format which is faster than CSV.
Table of Contents
pg-common
A set of common modules with utilities and constants. The most important
namespace is pg.oid
which is a registry of builtin OIDs in Postgres. It has
been generated out directly from the pg_type.dat
file stored in the official
Postgres repository.
Installation
Leiningen/Boot:
[com.github.igrishaev/pg-common "0.1.0"]
Clojure CLI/deps.edn:
com.github.igrishaev/pg-common {:mvn/version "0.1.0"}
Usage
That’s unlikely you’ll need that package directly.
pg-encode
A module to encode Clojure values into Postgres counterparts, both text and binary. At the moment, supports only the binary format for primitives, UUIDs and date/time types.
Installation
[com.github.igrishaev/pg-encode "0.1.0"]
Clojure CLI/deps.edn:
com.github.igrishaev/pg-encode {:mvn/version "0.1.0"}
Usage
Import the package:
(require 'pg.encode.bin)
(in-ns 'pg.encode.bin)
The encode
function, in its simple case, takes a value and returns a byte
array which represents that value in a Postgres-friendly binary format:
(encode 1)
[0, 0, 0, 0, 0, 0, 0, 1]
(type (encode 1))
[B
(encode true)
[1]
(encode false)
[0]
(encode "hello")
[104, 101, 108, 108, 111]
Complex types like Date
and UUID
are supported as well:
(encode (new java.util.Date))
[0, 2, -99, -97, -61, 76, -42, -104]
(encode (random-uuid))
[-89, 69, -70, 4, -61, -17, 71, 112, -94, -57, -6, 47, -42, 41, 24, 62]
Type-specific encoding
Sometimes you need precise on control on encoding. Say, a Long value 1 gets
encoded to int8 but you want it to be int4. The second argument of the encode
function takes an integer OID that specifies a column type. You can reach the
built-in OIDs using the pg.oid
module.
Thus, to encode Long 1 as int4, do this:
(encode 1 pg.oid/int4)
[0, 0, 0, 1]
To encode an integer as int8, do:
(let [i (int 42)]
(encode i pg.oid/int8))
[0, 0, 0, 0, 0, 0, 0, 42]
The same applies to Float and Double types. Float gets encoded to float4 by default and Double does to float8. Passing explicit OIDs corrects the output types.
(encode 1.01 pg.oid/float4)
[63, -127, 71, -82]
(encode 1.01 pg.oid/float8)
[63, -16, 40, -11, -62, -113, 92, 41]
Table of supported types and OIDs
At the moment of writing this, the module has the following mapping between Clojure types and Postgres OIDs.
Clojure | Postgres | Default |
---|---|---|
Symbol | text, varchar | text |
String | text, varchar, uuid | text |
Character | text, varchar | text |
Long | int8, int4, int2 | int8 |
Integer | int8, int4, int2 | int4 |
Short | int8, int4, int2 | int2 |
Boolean | bool | bool |
Float | float4, float8 | float4 |
Double | float4, float8 | float8 |
UUID | uuid, text | uuid |
j.u.Date | timestamp, date | timestamp |
j.t.Instant | timestamp, date | timestamp |
j.t.LocalDate | date | date |
Extending the encoding rules
Encoding a type that is missing the table above leads to an exception:
(encode {:foo 42} pg.oid/json)
Execution error (ExceptionInfo) at pg.error/error! (error.clj:14).
Cannot binary encode a value
{:value {:foo 42}, :oid 114, :opt nil}
But it can easily fixed by extending the -encode
multimethod from the
pg.encode.bin
namespace. Its dispatch function takes a vector where the first
item is a type of the value and the second is OID:
(defmethod -encode [UUID oid/uuid]
[^UUID value oid opt]
(let [most-bits (.getMostSignificantBits value)
least-bits (.getLeastSignificantBits value)]
(byte-array
(-> []
(into (array/arr64 most-bits))
(into (array/arr64 least-bits))))))
To extend the encoder with a map such that it becomes JSON in Postgres, use something like this:
(defmethod -encode [clojure.lang.IPersistentMap pg.oid/json]
[mapping _ _]
(-> mapping
(cheshire.core/generate-string )
(.getBytes "UTF-8")))
(encode {:foo 42} pg.oid/json)
[123, 34, 102, 111, 111, 34, 58, 52, 50, 125]
Let’s try the opposite: copy the output and restore the origin string:
(-> [123, 34, 102, 111, 111, 34, 58, 52, 50, 125]
(byte-array)
(String. "UTF-8"))
"{\"foo\":42}"
Default OIDs
When no OID is passed, it’s nil. Thus, you must specify one more method for the
[Type, nil]
pair. This method is used when the encode
function is called
with only one argument.
To keep the code short, there is a shortcut set-default
in pg.encode.bin
that takes a type, an OID and clones the method declared for this pair into the
[Type nil]
pair. The corresponding [Type OID]
method should be declared in
advance or you’ll get an exception.
Here is an example for the Integer type. It has tree pairs for int8, int4 and int2, and the int4 case is set as default.
(defmethod -encode [Integer oid/int8]
[value oid opt]
(-encode (long value) oid opt))
(defmethod -encode [Integer oid/int4]
[value oid opt]
(array/arr32 value))
(defmethod -encode [Integer oid/int2]
[value oid opt]
(-encode (short value) oid opt))
(set-default Integer oid/int4)
pg-joda-time
Extends the encoding protocols with Joda Time types.
Installation
Leiningen/Boot:
[com.github.igrishaev/pg-joda-time "0.1.0"]
Clojure CLI/deps.edn:
com.github.igrishaev/pg-joda-time {:mvn/version "0.1.0"}
Usage
Import the pg.joda-time
namespace to extend the -encode
protocol mentioned
above.
(encode (new org.joda.time.DateTime))
[0, 2, -99, -79, 88, 15, 48, -128]
Table of Types and OIDs
Clojure | Postgres | Default |
---|---|---|
o.j.t.LocalDate | date | date |
o.j.t.LocalTime | time | time |
o.j.t.DateTime | timestamp | timestamp |
pg-copy
A module to prepare an InputStream to be COPY
ied into the database. Uses the
binary format and thus is a bit faster than CSV.
Installation
Leiningen/Boot:
[com.github.igrishaev/pg-copy "0.1.0"]
Clojure CLI/deps.edn:
com.github.igrishaev/pg-copy {:mvn/version "0.1.0"}
Usage
The pg.copy
namespace provides a set of functions related to the COPY
Postgres operator.
The main data->input-stream
function takes data and returns an input stream
that is passed to CopyManager
. The data must be a sequence of sequences each
of the same size. The stream contains a binary payload that Postgres knows how
to parse. Assuming you have a table with bigint
, text
and bool
fields,
here is how building an input stream looks like:
(def data
[[1 "User 1" true]
[2 "User 2" false]
[3 "User 3" nil]])
(def input
(data->input-stream data))
(def sql-copy
"COPY users(id, name, active) FROM STDIN WITH BINARY")
(def copy-mgr
(new CopyManager <tcp-conn>))
(.copyIn copy-mgr sql-copy input)
If the data matches the table, you can skip the columns in the COPY expression
ans type just COPY users FROM
. But they are mandatory when you insert a
partial subset of columns in another order:
(def data
[["User 1" 1]
["User 2" 2]])
...
(def sql-copy
"COPY users(name, id) FROM STDIN WITH BINARY")
OID hints
Imagine you have a column of integer
in a table. This type consists from 4
bytes whereas the standard Long type in Java consists from 8 bytes. If you
encode a Long value and COPY it into Postgres, it will argue on the payload
saying it’s incorrect.
To solve the problem, either you coerce a Long value to Integer or, which is better and simpler, specify that the Long column must be encoded as Integer. This is know as OID hints.
The data->input-stream
function accepts a map of options. The :oids
field
might be either a vector or a map of Postgres OIDs:
(def oids [oid/int4])
(def data
[[1 "User 1" true]
[2 "User 2" false]
[3 "User 3" nil]])
(def input
(data->input-stream data {:oids oids}))
Above, we specify that the first column (Long values 1, 2, 3) must be encoded as 4-byte integers.
It’s not necessary to specify OIDs for all columns. Internally, the vector is
passed into the (get oids i)
form where the i
is an index of a column. A nil
OID stands for the default encoding rule.
Another example where you specify the type the third column:
(def oids [nil nil oid/int4])
(def data
[["User 1" true 1]
["User 2" false 2]
["User 3" nil 3]])
You can also use a map of index → OID where the index starts from zero:
(def oids {2 oid/int4})
(def data
[["User 1" true 1]
["User 2" false 2]
["User 3" nil 3]])
Finally, OID hints might carry not integer OIDs but their names as well, for example:
[nil nil "int4"]
{2 "int4"}
See the pg.oid
for their names and values.
Hints in metadata
When the :oids
field is not passed, the library makes an attempt to fetch the
hints from the metadata of a matrix. Their field is :pg/oids
. There is a
function with-oids
that supplies a matrix with the type hints as follows:
(def data
(with-oids
[["User 1" true 1]
["User 2" false 2]
["User 3" nil 3]]
{2 oid/int4}))
Then you passed the data into the data->input-stream
function without the
:oids
option.
Working with maps
The code shown above works with matrices although most often we deal with
maps. The former might be transformed to the latter with a helper function
called maps->data
. It takes a seq of maps, the keys to select, and,
optionally, a map of key => OID for encoding.
(def rows
[{:name "User 1" :id 1 :active true}
{:name "User 2" :id 2 :active false}
{:name "User 3" :id 3 :active nil}])
(maps->data rows [:id :name])
([1 "User 1"]
[2 "User 2"]
[3 "User 3"])
An example with the third argument produces the same result but with an additional field in its metadata:
(maps->data rows [:id :name] {:id "int4"})
(meta *1)
{:pg/oids ["int4" nil]}
Since the matrix is already charged with OID hints, there is no need to pass
them into the :oids
option.
Other functions
data->bytes
acts the same but dumps the payload into a byte array:
(data->bytes [[1 "User 1" true]])
[80, 71, 67, 79, 80, 89, 10, -1, 13, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0,
0, 8, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 6, 85, 115, 101, 114, 32, 49, 0, 0, 0,
1, 1, -1, -1]
data->file
saves the binary payload into a file:
(data->file [[1 "User 1" true]] "out.bin")
> od -c out.bin
0000000 P G C O P Y \n 377 \r \n \0 \0 \0 \0 \0 \0
0000020 \0 \0 \0 \0 001 \0 \0 \0 001 001 \0 001 \0 \0 \0 001
0000040 \0 \0 001 377 377 377 377 377 377
A more general function data->input-stream
redirects the payload into an
instance of an OutputStream
.
All the functions accept an additional map of options with the :oids
field.
pg-copy-jdbc
A wrapper on top of pg-copy
that performs the COPY
command using the
Postgres driver. Depends on [org.postgresql/postgresql "42.2.18"]
.
Installation
Leiningen/Boot:
[com.github.igrishaev/pg-copy-jdbc "0.1.0"]
Clojure CLI/deps.edn:
com.github.igrishaev/pg-copy-jdbc {:mvn/version "0.1.0"}
Usage
The pg.copy.jdbc
namespace carries a function copy-in
that takes a
connection, a SQL expression with COPY, the data to load and the options. The
connection must be an instance of org.postgresql.jdbc.PgConnection
but not a
Clojure map. You can easily get that connection using next.jdbc
:
(with-open [conn (jdbc/get-connection db-spec)]
...)
The function returns a number of records have been loaded:
(def sql-copy
"COPY users (id, name) FROM STDIN WITH BINARY")
(def data
[[1 "User 1"]
[2 "User 2"]])
(copy-in conn sql-copy data)
;; 2
All the said above about OID hints applies here as well. Say, if the id
column is of the integer
type but not bigint
, specify the hint:
(copy-in conn sql-copy data {:oids ["int4"]})
;; or
(copy-in conn sql-copy (copy/with-oids data {0 oid/int4}))
Parallel COPY
The copy-in
function loads the data in one thread which is usually OK. Often
though you have millions of rows to load which is time consuming. Splitting the
rows on chunks and loading them in parallel using separate connections might
save your time.
The copy-in-parallel
function does it for you. It accepts a DataSource
instance, a SQL expression, the data, the number of threads and the chunk
size. Under the hood, it creates a new FixedThreadPool
of the threads
size. For each data chunk, it spawns a new connection that loads that chunk.
The datasource object can be obtained with next.jdbc
as follows:
(let [ds (jdbc/get-datasource db-spec)]
...)
Here we COPY the user rows in parallel in 4 threads (and simultaneous connections) by 10K rows in each:
(def sql-copy
"COPY users (id, name) FROM STDIN WITH BINARY")
(def data
[[1 "User 1"]
[2 "User 2"]
... ;; 10M or so
])
(copy-in-parallel
ds
sql-copy
data
4
10000
{:oids {0 oid/int4 1 oid/text 2}})
;; 10M
The function returns the total numbers of rows copied summing the intermediate results.
Measurements
Let’s compare timings. The data was collected on Apple M1 Max 32Gb with 10 Cores.
Plain COPY
Total | Format | Time, sec |
---|---|---|
10M | binary | 17.4 |
10M | CSV | 51.2 |
Parallel COPY
Binary:
Total | Threads | Chunk | Format | Time, sec |
---|---|---|---|---|
10M | 8 | 10k | binary | 11.3 |
10M | 4 | 10k | binary | 13.7 |
10M | 1 | 10k | binary | 28.6 |
CSV:
Total | Threads | Chunk | Format | Time, sec |
---|---|---|---|---|
10M | 8 | 10k | CSV | 10.6 |
10M | 4 | 10k | CSV | 19.9 |
10M | 1 | 10k | CSV | 71.7 |
Summary
These tables prove the following rules:
-
The more threads you allocate for COPY, the faster it goes. But it’s not linear: doubling the threads from 4 to 8 doesn’t cuts the time twice.
-
With a lots of threads, the difference between binary and CSV not significant. But it is when you have only one thread: on 10M rows, the binary format beats CSV by 2.5 times.
Нашли ошибку? Выделите мышкой и нажмите Ctrl/⌘+Enter
Павел Сутырин, 29th May 2023, link
В превью этой статьи в ленте не работают ссылки из содержания ;) видимо, им стоит вести на страницу статьи # к заголовкам.