PG: Postgres-related libraries for Clojure
The PG project holds a set of packages related to PostgreSQL. This is a
breakdown of my unsuccessful attempt to write a PostgreSQL client in pure
Clojure (stored in the _ directory). Although I didn’t achieve the goal, some
parts of the code are now shipped as separated packages and might be useful for
someone.
At the moment, the most interesting modules are pg-copy and pg-copy-jdbc
that COPY the data using the binary Postgres format which is faster than CSV.
Table of Contents
pg-common
A set of common modules with utilities and constants. The most important
namespace is pg.oid which is a registry of builtin OIDs in Postgres. It has
been generated out directly from the pg_type.dat file stored in the official
Postgres repository.
Installation
Leiningen/Boot:
[com.github.igrishaev/pg-common "0.1.0"]
Clojure CLI/deps.edn:
com.github.igrishaev/pg-common {:mvn/version "0.1.0"}
Usage
That’s unlikely you’ll need that package directly.
pg-encode
A module to encode Clojure values into Postgres counterparts, both text and binary. At the moment, supports only the binary format for primitives, UUIDs and date/time types.
Installation
[com.github.igrishaev/pg-encode "0.1.0"]
Clojure CLI/deps.edn:
com.github.igrishaev/pg-encode {:mvn/version "0.1.0"}
Usage
Import the package:
(require 'pg.encode.bin)
(in-ns 'pg.encode.bin)
The encode function, in its simple case, takes a value and returns a byte
array which represents that value in a Postgres-friendly binary format:
(encode 1)
[0, 0, 0, 0, 0, 0, 0, 1]
(type (encode 1))
[B
(encode true)
[1]
(encode false)
[0]
(encode "hello")
[104, 101, 108, 108, 111]
Complex types like Date and UUID are supported as well:
(encode (new java.util.Date))
[0, 2, -99, -97, -61, 76, -42, -104]
(encode (random-uuid))
[-89, 69, -70, 4, -61, -17, 71, 112, -94, -57, -6, 47, -42, 41, 24, 62]
Type-specific encoding
Sometimes you need precise on control on encoding. Say, a Long value 1 gets
encoded to int8 but you want it to be int4. The second argument of the encode
function takes an integer OID that specifies a column type. You can reach the
built-in OIDs using the pg.oid module.
Thus, to encode Long 1 as int4, do this:
(encode 1 pg.oid/int4)
[0, 0, 0, 1]
To encode an integer as int8, do:
(let [i (int 42)]
(encode i pg.oid/int8))
[0, 0, 0, 0, 0, 0, 0, 42]
The same applies to Float and Double types. Float gets encoded to float4 by default and Double does to float8. Passing explicit OIDs corrects the output types.
(encode 1.01 pg.oid/float4)
[63, -127, 71, -82]
(encode 1.01 pg.oid/float8)
[63, -16, 40, -11, -62, -113, 92, 41]
Table of supported types and OIDs
At the moment of writing this, the module has the following mapping between Clojure types and Postgres OIDs.
| Clojure | Postgres | Default |
|---|---|---|
| Symbol | text, varchar | text |
| String | text, varchar, uuid | text |
| Character | text, varchar | text |
| Long | int8, int4, int2 | int8 |
| Integer | int8, int4, int2 | int4 |
| Short | int8, int4, int2 | int2 |
| Boolean | bool | bool |
| Float | float4, float8 | float4 |
| Double | float4, float8 | float8 |
| UUID | uuid, text | uuid |
| j.u.Date | timestamp, date | timestamp |
| j.t.Instant | timestamp, date | timestamp |
| j.t.LocalDate | date | date |
Extending the encoding rules
Encoding a type that is missing the table above leads to an exception:
(encode {:foo 42} pg.oid/json)
Execution error (ExceptionInfo) at pg.error/error! (error.clj:14).
Cannot binary encode a value
{:value {:foo 42}, :oid 114, :opt nil}
But it can easily fixed by extending the -encode multimethod from the
pg.encode.bin namespace. Its dispatch function takes a vector where the first
item is a type of the value and the second is OID:
(defmethod -encode [UUID oid/uuid]
[^UUID value oid opt]
(let [most-bits (.getMostSignificantBits value)
least-bits (.getLeastSignificantBits value)]
(byte-array
(-> []
(into (array/arr64 most-bits))
(into (array/arr64 least-bits))))))
To extend the encoder with a map such that it becomes JSON in Postgres, use something like this:
(defmethod -encode [clojure.lang.IPersistentMap pg.oid/json]
[mapping _ _]
(-> mapping
(cheshire.core/generate-string )
(.getBytes "UTF-8")))
(encode {:foo 42} pg.oid/json)
[123, 34, 102, 111, 111, 34, 58, 52, 50, 125]
Let’s try the opposite: copy the output and restore the origin string:
(-> [123, 34, 102, 111, 111, 34, 58, 52, 50, 125]
(byte-array)
(String. "UTF-8"))
"{\"foo\":42}"
Default OIDs
When no OID is passed, it’s nil. Thus, you must specify one more method for the
[Type, nil] pair. This method is used when the encode function is called
with only one argument.
To keep the code short, there is a shortcut set-default in pg.encode.bin
that takes a type, an OID and clones the method declared for this pair into the
[Type nil] pair. The corresponding [Type OID] method should be declared in
advance or you’ll get an exception.
Here is an example for the Integer type. It has tree pairs for int8, int4 and int2, and the int4 case is set as default.
(defmethod -encode [Integer oid/int8]
[value oid opt]
(-encode (long value) oid opt))
(defmethod -encode [Integer oid/int4]
[value oid opt]
(array/arr32 value))
(defmethod -encode [Integer oid/int2]
[value oid opt]
(-encode (short value) oid opt))
(set-default Integer oid/int4)
pg-joda-time
Extends the encoding protocols with Joda Time types.
Installation
Leiningen/Boot:
[com.github.igrishaev/pg-joda-time "0.1.0"]
Clojure CLI/deps.edn:
com.github.igrishaev/pg-joda-time {:mvn/version "0.1.0"}
Usage
Import the pg.joda-time namespace to extend the -encode protocol mentioned
above.
(encode (new org.joda.time.DateTime))
[0, 2, -99, -79, 88, 15, 48, -128]
Table of Types and OIDs
| Clojure | Postgres | Default |
|---|---|---|
| o.j.t.LocalDate | date | date |
| o.j.t.LocalTime | time | time |
| o.j.t.DateTime | timestamp | timestamp |
pg-copy
A module to prepare an InputStream to be COPYied into the database. Uses the
binary format and thus is a bit faster than CSV.
Installation
Leiningen/Boot:
[com.github.igrishaev/pg-copy "0.1.0"]
Clojure CLI/deps.edn:
com.github.igrishaev/pg-copy {:mvn/version "0.1.0"}
Usage
The pg.copy namespace provides a set of functions related to the COPY
Postgres operator.
The main data->input-stream function takes data and returns an input stream
that is passed to CopyManager. The data must be a sequence of sequences each
of the same size. The stream contains a binary payload that Postgres knows how
to parse. Assuming you have a table with bigint, text and bool fields,
here is how building an input stream looks like:
(def data
[[1 "User 1" true]
[2 "User 2" false]
[3 "User 3" nil]])
(def input
(data->input-stream data))
(def sql-copy
"COPY users(id, name, active) FROM STDIN WITH BINARY")
(def copy-mgr
(new CopyManager <tcp-conn>))
(.copyIn copy-mgr sql-copy input)
If the data matches the table, you can skip the columns in the COPY expression
ans type just COPY users FROM. But they are mandatory when you insert a
partial subset of columns in another order:
(def data
[["User 1" 1]
["User 2" 2]])
...
(def sql-copy
"COPY users(name, id) FROM STDIN WITH BINARY")
OID hints
Imagine you have a column of integer in a table. This type consists from 4
bytes whereas the standard Long type in Java consists from 8 bytes. If you
encode a Long value and COPY it into Postgres, it will argue on the payload
saying it’s incorrect.
To solve the problem, either you coerce a Long value to Integer or, which is better and simpler, specify that the Long column must be encoded as Integer. This is know as OID hints.
The data->input-stream function accepts a map of options. The :oids field
might be either a vector or a map of Postgres OIDs:
(def oids [oid/int4])
(def data
[[1 "User 1" true]
[2 "User 2" false]
[3 "User 3" nil]])
(def input
(data->input-stream data {:oids oids}))
Above, we specify that the first column (Long values 1, 2, 3) must be encoded as 4-byte integers.
It’s not necessary to specify OIDs for all columns. Internally, the vector is
passed into the (get oids i) form where the i is an index of a column. A nil
OID stands for the default encoding rule.
Another example where you specify the type the third column:
(def oids [nil nil oid/int4])
(def data
[["User 1" true 1]
["User 2" false 2]
["User 3" nil 3]])
You can also use a map of index → OID where the index starts from zero:
(def oids {2 oid/int4})
(def data
[["User 1" true 1]
["User 2" false 2]
["User 3" nil 3]])
Finally, OID hints might carry not integer OIDs but their names as well, for example:
[nil nil "int4"]
{2 "int4"}
See the pg.oid for their names and values.
Hints in metadata
When the :oids field is not passed, the library makes an attempt to fetch the
hints from the metadata of a matrix. Their field is :pg/oids. There is a
function with-oids that supplies a matrix with the type hints as follows:
(def data
(with-oids
[["User 1" true 1]
["User 2" false 2]
["User 3" nil 3]]
{2 oid/int4}))
Then you passed the data into the data->input-stream function without the
:oids option.
Working with maps
The code shown above works with matrices although most often we deal with
maps. The former might be transformed to the latter with a helper function
called maps->data. It takes a seq of maps, the keys to select, and,
optionally, a map of key => OID for encoding.
(def rows
[{:name "User 1" :id 1 :active true}
{:name "User 2" :id 2 :active false}
{:name "User 3" :id 3 :active nil}])
(maps->data rows [:id :name])
([1 "User 1"]
[2 "User 2"]
[3 "User 3"])
An example with the third argument produces the same result but with an additional field in its metadata:
(maps->data rows [:id :name] {:id "int4"})
(meta *1)
{:pg/oids ["int4" nil]}
Since the matrix is already charged with OID hints, there is no need to pass
them into the :oids option.
Other functions
data->bytes acts the same but dumps the payload into a byte array:
(data->bytes [[1 "User 1" true]])
[80, 71, 67, 79, 80, 89, 10, -1, 13, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0,
0, 8, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 6, 85, 115, 101, 114, 32, 49, 0, 0, 0,
1, 1, -1, -1]
data->file saves the binary payload into a file:
(data->file [[1 "User 1" true]] "out.bin")
> od -c out.bin
0000000 P G C O P Y \n 377 \r \n \0 \0 \0 \0 \0 \0
0000020 \0 \0 \0 \0 001 \0 \0 \0 001 001 \0 001 \0 \0 \0 001
0000040 \0 \0 001 377 377 377 377 377 377
A more general function data->input-stream redirects the payload into an
instance of an OutputStream.
All the functions accept an additional map of options with the :oids field.
pg-copy-jdbc
A wrapper on top of pg-copy that performs the COPY command using the
Postgres driver. Depends on [org.postgresql/postgresql "42.2.18"].
Installation
Leiningen/Boot:
[com.github.igrishaev/pg-copy-jdbc "0.1.0"]
Clojure CLI/deps.edn:
com.github.igrishaev/pg-copy-jdbc {:mvn/version "0.1.0"}
Usage
The pg.copy.jdbc namespace carries a function copy-in that takes a
connection, a SQL expression with COPY, the data to load and the options. The
connection must be an instance of org.postgresql.jdbc.PgConnection but not a
Clojure map. You can easily get that connection using next.jdbc:
(with-open [conn (jdbc/get-connection db-spec)]
...)
The function returns a number of records have been loaded:
(def sql-copy
"COPY users (id, name) FROM STDIN WITH BINARY")
(def data
[[1 "User 1"]
[2 "User 2"]])
(copy-in conn sql-copy data)
;; 2
All the said above about OID hints applies here as well. Say, if the id
column is of the integer type but not bigint, specify the hint:
(copy-in conn sql-copy data {:oids ["int4"]})
;; or
(copy-in conn sql-copy (copy/with-oids data {0 oid/int4}))
Parallel COPY
The copy-in function loads the data in one thread which is usually OK. Often
though you have millions of rows to load which is time consuming. Splitting the
rows on chunks and loading them in parallel using separate connections might
save your time.
The copy-in-parallel function does it for you. It accepts a DataSource
instance, a SQL expression, the data, the number of threads and the chunk
size. Under the hood, it creates a new FixedThreadPool of the threads
size. For each data chunk, it spawns a new connection that loads that chunk.
The datasource object can be obtained with next.jdbc as follows:
(let [ds (jdbc/get-datasource db-spec)]
...)
Here we COPY the user rows in parallel in 4 threads (and simultaneous connections) by 10K rows in each:
(def sql-copy
"COPY users (id, name) FROM STDIN WITH BINARY")
(def data
[[1 "User 1"]
[2 "User 2"]
... ;; 10M or so
])
(copy-in-parallel
ds
sql-copy
data
4
10000
{:oids {0 oid/int4 1 oid/text 2}})
;; 10M
The function returns the total numbers of rows copied summing the intermediate results.
Measurements
Let’s compare timings. The data was collected on Apple M1 Max 32Gb with 10 Cores.
Plain COPY
| Total | Format | Time, sec |
|---|---|---|
| 10M | binary | 17.4 |
| 10M | CSV | 51.2 |
Parallel COPY
Binary:
| Total | Threads | Chunk | Format | Time, sec |
|---|---|---|---|---|
| 10M | 8 | 10k | binary | 11.3 |
| 10M | 4 | 10k | binary | 13.7 |
| 10M | 1 | 10k | binary | 28.6 |
CSV:
| Total | Threads | Chunk | Format | Time, sec |
|---|---|---|---|---|
| 10M | 8 | 10k | CSV | 10.6 |
| 10M | 4 | 10k | CSV | 19.9 |
| 10M | 1 | 10k | CSV | 71.7 |
Summary
These tables prove the following rules:
-
The more threads you allocate for COPY, the faster it goes. But it’s not linear: doubling the threads from 4 to 8 doesn’t cuts the time twice.
-
With a lots of threads, the difference between binary and CSV not significant. But it is when you have only one thread: on 10M rows, the binary format beats CSV by 2.5 times.
Нашли ошибку? Выделите мышкой и нажмите Ctrl/⌘+Enter
Павел Сутырин, 29th May 2023, link
В превью этой статьи в ленте не работают ссылки из содержания ;) видимо, им стоит вести на страницу статьи # к заголовкам.