PG2 release 0.1.15
PG2 version 0.1.15 is out. This version mostly ships improvements to connection pool and folders (reducers) of a database result. There are two new sections in the documentation that describe each part. I reproduce them below.
Connection Pool
Problem: every time you connect to the database, it takes time to open a socket, pass authentication pipeline and receive initial data from the server. From the server’s prospective, a new connection spawns a new process which is also an expensive operation. If you open a connection per a query, your application is about ten times slower than it could be.
Connection pools solve that problem. A pool holds a set of connections opened in advance, and you borrow them from a pool. When borrowed, a connection cannot be shared with somebody else any longer. Once you’ve done with your work, you return the connection to the pool, and it’s available for other consumers.
PG2 ships a simple and robust connection pool out from the box. This section covers how to use it.
A Simple Example
Import both core and pool namespaces as follows:
(ns demo
(:require
[pg.core :as pg]
[pg.pool :as pool]))
Here is how you use the pool:
(def config
{:host "127.0.0.1"
:port 5432
:user "test"
:password "test"
:database "test"})
(pool/with-pool [pool config]
(pool/with-connection [conn pool]
(pg/execute conn "select 1 as one")))
The pool/with-pool macro creates a pool object from the config map and binds
it to the pool symbol. Once you exit the macro, the pool gets closed.
The with-pool macro can be easily replaced with the with-open macro and the
pool function that creates a pool instance. By exit, the macro calls the
.close method of an opened object, which closes the pool.
(with-open [pool (pool/pool config)]
(pool/with-conn [conn pool]
(pg/execute conn "select 1 as one")))
Having a pool object, use it with the pool/with-connection macro (there is a
shorter version pool/with-conn as well). This macro borrows a connection from
the pool and binds it to the conn symbol. Now you pass the connection to
pg/execute, pg/query and so on. By exiting the with-connection macro, the
connection is returned to the pool.
And this is briefly everything you need to know about the pool! Sections below describe more about its inner state and behavior.
Configuration
The pool object accepts the same config the Connection object does section for
the table of parameters). In addition to these, the fillowing options are
accepted:
| Field | Type | Default | Comment |
|---|---|---|---|
:pool-min-size |
integer | 2 | Minimum number of open connections when initialized. |
:pool-max-size |
integer | 8 | Maximum number of open connections. Cannot be exceeded. |
:pool-expire-threshold-ms |
integer | 300.000 (5 mins) | How soon a connection is treated as expired and will be forcibly closed. |
:pool-borrow-conn-timeout-ms |
integer | 15.000 (15 secs) | How long to wait when borrowing a connection while all the connections are busy. By timeout, an exception is thrown. |
The first option :pool-min-size specifies how many connection are opened at
the beginning. Setting too many is not necessary because you never know if you
application will really use all of them. It’s better to start with a small
number and let the pool to grow in time, if needed.
The next option :pool-max-size determines the total number of open
connections. When set, it cannot be overridden. If all the connections are busy
and there is still a gap, the pool spawns a new connection and adds it to the
internal queue. But if the :pool-max-size value is reached, an exception is
thrown.
The option :pool-expire-threshold-ms specifies the number of
milliseconds. When a certain amount of time has passed since the connection’s
initialization, it is considered expired and will be closed by the pool. This is
used to rotate connections and prevent them from living for too long.
The option :pool-borrow-conn-timeout-ms prescribes how long to wait when
borrowing a connection from an exhausted pool: a pool where all the connections
are busy and the :pool-max-size value is reached. At this case, the only hope
that other clients complete their work and return theri connection before
timeout bangs. Should there still haven’t been any free connections during the
:pool-borrow-conn-timeout-ms time window, an exception pops up.
Pool Methods
The stats function returns info about free and used connections:
(pool/with-pool [pool config]
(pool/stats pool)
;; {:free 1 :used 0}
(pool/with-connection [conn pool]
(pool/stats pool)
;; {:free 0 :used 1}
))
It might be used to send metrics to Grafana, CloudWatch, etc.
Manual Pool Management
The following functions help you manage a connection pool manually, for example when it’s wrapped into a component (see Component and Integrant libraries).
The pool function creates a pool:
(def POOL (pool/pool config))
The used-count and free-count functions return total numbers of busy and
free connections, respectively:
(pool/free-count POOL)
;; 2
(pool/used-count POOL)
;; 0
The pool? predicate ensures it’s a Pool instance indeed:
(pool/pool? POOL)
;; true
Closing
The close method shuts down a pool instance. On shutdown, first, all the free
connections get closed. Then the pool closes busy connections that were
borrowed. This might lead to failures in other threads, so it’s worth waiting
until the pool has zero busy connections.
(pool/close POOL)
;; nil
The closed? predicate ensures the pool has already been closed:
(pool/closed? POOL)
;; true
Borrow Logic in Detail
When getting a connection from a pool, the following conditions are taken into account:
- if the pool is closed, an exception is thrown;
- if there are free connections available, the pool takes one of them;
- if a connection is expired (was created long ago), it’s closed and the pool performs another attempt;
- if there aren’t free connections, but the max number of used connection has not been reached yet, the pool spawns a new connection;
- if the number of used connections is reached, the pool waits for
:pool-borrow-conn-timeout-msamount of milliseconds hoping that someone releases a connection in the background; - by timeout (when nobody did), the pool throws an exception.
Returning Logic in Detail
When you return a connection to a pool, the following cases might come into play:
- if the connection is an error state, then transaction is rolled back, and the connection is closed;
- if the connection is in transaction mode, it is rolled back, and the connection is marked as free again;
- if it was already closed, the pool just removes it from used connections. It won’t be added into the free queue;
- if the pool is closed, the connection is removed from used connections;
- when none of above conditions is met, the connection is removed from used and becomes available for other consumers again.
This was the Connecton Pool section, and now we proceed with Folders.
Folders (Reducers)
Folders (which are also known as reducers) are objects that transform rows from network into something else. A typical folder consists from an initial value (which might be mutable) and logic that adds the next row to that value. Before returning the value, a folder might post-process it somehow, for example turn it into an immutable value.
The default folder (which you don’t need to specify) acts exactly like this: it
spawns a new transient vector and conj!es all the incoming rows into
it. Finally, it returns a persistent! version of this vector.
PG2 provides a great variety of folders: to build maps or sets, to index or group rows by a certain function. With folders, it’s possible to dump a database result into a JSON or EDN file.
It’s quite important that folders process rows on the fly. Like transducers, they don’t keep the whole dataset in memory. They only track the accumulator and the current row no matter how many of them have arrived from the database: one thousand or one million.
A Simple Folder
Technically a folder is a function (an instance of clojure.lang.IFn) with
three bodies of arity 0, 1, and 2, as follows:
(defn a-folder
([]
...)
([acc]
...)
([acc row]
...))
-
The first 0-arity form produces an accumulator that might be mutable.
-
The third 2-arity form takes the accumulator and the current row and returns an updated version of the accumulator.
-
The second 1-arity form accepts the last version of the accumulator and transforms it somehow, for example seals a transient collection into its persistent view.
Here is the default folder:
(defn default
([]
(transient []))
([acc!]
(persistent! acc!))
([acc! row]
(conj! acc! row)))
Some folders depend on initial settings and thus produce folding functions. Here
is an example of the map folder that acts like the map function from
clojure.core:
(defn map
[f]
(fn folder-map
([]
(transient []))
([acc!]
(persistent! acc!))
([acc! row]
(conj! acc! (f row)))))
Passing A Folder
To pass a custom folder to process the result, specify the :as key as follows:
(require '[pg.fold :as fold])
(defn row-sum [{:keys [field_1 field_2]}]
(+ field_1 field_2))
(pg/execute conn query {:as (fold/map row-sum)})
;; [10 53 14 32 ...]
Standard Folders and Aliases
PG provides a number of built-in folders. Some of them are used so often that it’s not needed to pass them explicitly. There are shortcuts that enable certain folders internally. Below, find the actual list of folders, their shortcuts and examples.
Column
Takes a single column from each row returning a plain vector:
(pg/execute conn query {:as (fold/column :id)})
;; [1 2 3 4 ....]
There is an alias :column that accepts a name of the column:
(pg/execute conn query {:column :id})
;; [1 2 3 4 ....]
Map
Acts like the standard map function from clojure.core. Applies a function to
each row and collects a vector of results.
Passing the folder explicitly:
(pg/execute conn query {:as (fold/map func)})
And with an alias:
(pg/execute conn query {:map func})
Default
Collects unmodified rows into a vector. That’s unlikely you’ll need that folder as it gets applied internally when no other folders were specified.
Dummy
A folder that doesn’t accumulate the rows but just skips them and returns nil.
(pg/execute conn query {:as fold/dummy})
nil
First
Perhaps the most needed folder, first returns the first row only and skips the
rest. Pay attention, this folder doesn’t have a state and thus doesn’t need to
be initiated. Useful when you query a single row by its primary key:
(pg/execute conn
"select * from users where id = $1"
{:params [42]
:as fold/first})
{:id 42 :email "test@test.com"}
Or pass the :first (or :first?) option set to true:
(pg/execute conn
"select * from users where id = $1"
{:params [42]
:first true})
{:id 42 :email "test@test.com"}
Index by
Often, you select rows as a vector and build a map like {id => row}, for
example:
(let [rows (jdbc/execute! conn ["select ..."])]
(reduce (fn [acc row]
(assoc acc (:id row) row))
{}
rows))
{1 {:id 1 :name "test1" ...}
2 {:id 2 :name "test2" ...}
3 {:id 3 :name "test3" ...}
...
}
This process is known as indexing because later on, the map is used as an index for quick lookups.
This approach, although is quite common, has flaws. First, you traverse rows
twice: when fetching them from the database, and then again inside
reduce. Second, it takes extra lines of code.
The index-by folder does exactly the same: it accepts a function which is
applied to a row and uses the result as an index key. Most often you pass a
keyword:
(let [query
"with foo (a, b) as (values (1, 2), (3, 4), (5, 6))
select * from foo"
res
(pg/execute conn query {:as (fold/index-by :a)})]
{1 {:a 1 :b 2}
3 {:a 3 :b 4}
5 {:a 5 :b 6}})
The shortcut :index-by accepts a function as well:
(pg/execute conn query {:index-by :a})
Group by
The group-by folder is simlar to index-by but collects multiple rows per a
grouping function. It produces a map like {(f row) => [row1, row2, ...]} where
row1, row2 and the rest return the same value for f.
Imagine each user in the database has a role:
{:id 1 :name "Test1" :role "user"}
{:id 2 :name "Test2" :role "user"}
{:id 3 :name "Test3" :role "admin"}
{:id 4 :name "Test4" :role "owner"}
{:id 5 :name "Test5" :role "admin"}
This is what group-by returns when grouping by the :role field:
(pg/execute conn query {:as (fold/group-by :role)})
{"user"
[{:id 1, :name "Test1", :role "user"}
{:id 2, :name "Test2", :role "user"}]
"admin"
[{:id 3, :name "Test3", :role "admin"}
{:id 5, :name "Test5", :role "admin"}]
"owner"
[{:id 4, :name "Test4", :role "owner"}]}
The folder has its own alias which accepts a function:
(pg/execute conn query {:group-by :role})
KV (Key and Value)
The kv folder accepts two functions: the first one is for a key (fk), and
the second is for a value (fv). Then it produces a map like {(fk row) => (fv
row)}.
A typical example might be a narrower index map. Imagine you select just a
couple of fields, id and email. Now you need a map of {id => email} for
quick email lookup by id. This is where kv does the job for you.
(pg/execute conn
"select id, email from users"
{:as (fold/kv :id :email)})
{1 "ivan@test.com"
2 "hello@gmail.com"
3 "skotobaza@mail.ru"}
The :kv alias accepts a vector of two functions:
(pg/execute conn
"select id, email from users"
{:kv [:id :email]})
Run
The run folder is useful for processing rows with side effects, e.g. printing
them, writing to files, passing via API. A one-argument function passed to run
is applied to each row ignoring the result. The folder counts a total number of
rows being processed.
(defn func [row]
(println "processing row" row)
(send-to-api row))
(pg/execute conn query {:as (fold/run func)})
100 ;; the number of rows processed
An example with an alias:
(pg/execute conn query {:run func})
Table
The table folder returns a plain matrix (a vector of vectors) of database
values. It reminds the columns folder but also keeps column names in the
leading row. Thus, the resulting table always has at least one row (it’s never
empty because of the header). The table view is useful when saving the data into
CSV.
The folder has its inner state and thus needs to be initialized with no parameters:
(pg/execute conn query {:as (fold/table)})
[[:id :email]
[1 "ivan@test.com"]
[2 "skotobaza@mail.ru"]]
The alias :table accepts any non-false value:
(pg/execute conn query {:table true})
[[:id :email]
[1 "ivan@test.com"]
[2 "skotobaza@mail.ru"]]
Java
This folder produces java.util.ArrayList where each row is an instance of
java.util.HashMap. It doesn’t require initialization:
(pg/execute conn query {:as fold/java})
Alias:
(pg/execute conn query {:java true})
Reduce
The reduce folder acts like the same-name function from clojure.core. It
accepts a function and an initial value (accumulator). The function accepts the
accumulator and the current row, and returns an updated version of the
accumulator.
Here is how you collect unique pairs of size and color from the database result:
(defn ->pair [acc {:keys [sku color]}]
(conj acc [a b]))
(pg/execute conn query {:as (fold/reduce ->pair #{})})
#{[:xxl :green]
[:xxl :red]
[:x :red]
[:x :blue]}
The folder ignores reduced logic: it performs iteration until all rows are
consumed. It doesn’t check if the accumulator is wrapped with reduced.
The :reduce alias accepts a vector of a function and an initial value:
(pg/execute conn query {:reduce [->pair #{}]})
Into (Transduce)
This folder mimics the into logic when it deals with an xform, also known as
a transducer. Sometimes, you need to pass the result throughout a bunch of
map/filter/keep functions. Each of them produces an intermediate
collection which is not as fast as it could be with a transducer. Transducers
are designed such that they compose a stack of actions, which, when being run,
does not produce extra collections.
The into folder accepts an xform produced by map/filter/comp,
whatever. It also accepts a persistent collection which acts as an
accumulator. The accumulator gets transformed into a transient view internally
for better performance. The folder uses conj! to push values into the
accumulator, so maps are not acceptable, only vectors, lists, or sets. When the
accumulator is not passed, it’s an empty vector.
Here is a quick example of into in action:
(let [tx
(comp (map :a)
(filter #{1 5})
(map str))
query
"with foo (a, b) as (values (1, 2), (3, 4), (5, 6))
select * from foo"]
(pg/execute conn query {:as (fold/into tx)}))
;; ["1" "5"]
Another case where we pass a non-empty set to collect the values:
(pg/execute conn query {:as (fold/into tx #{:a :b :c})})
;; #{:a :b :c "1" "5"}
The :into alias is a vector where the first item is an xform and the second
is an accumulator:
(pg/execute conn query {:into [tx []]})
To EDN
This folder writes down rows into an EDN file. It accepts an instance of
java.io.Writer which must be opened in advance. The folder doesn’t open nor
close the writer as these actions are beyond its scope. A common pattern is to
wrap pg/execute or pg/query invocations with the with-open macro that
handles closing procedure even in case of an exception.
The folder writes down rows into the writer using pr-str. Each row takes one
line, and the lines are split with \n. The leading line is [, and the
trailing is ].
The result is a number of rows processed. Here is an example of dumping rows into a file called “test.edn”:
(with-open [out (-> "test.edn" io/file io/writer)]
(pg/execute conn query {:as (fold/to-edn out)}))
;; 199
Let’s check the content of the file:
[
{:id 1 :email "test@test.com"}
{:id 2 :email "hello@test.com"}
...
{:id 199 :email "ivan@test.com"}
]
The alias :to-edn accepts a writer object:
(with-open [out (-> "test.edn" io/file io/writer)]
(pg/execute conn query {:to-edn out}))
To JSON
Like to-edn but dumps rows into JSON. Accepts an instance of
java.io.Writer. Writes rows line by line with no pretty printing. Lines are
joined with a comma. The leading and trailing lines are square brackets. The
result is the number of rows put into the writer.
(with-open [out (-> "test.json" io/file io/writer)]
(pg/execute conn query {:as (fold/to-json out)}))
;; 123
The content of the file:
[
{"b":2,"a":1},
{"b":4,"a":3},
// ...
{"b":6,"a":5}
]
The :to-json alias accepts a writer object:
(with-open [out (-> "test.json" io/file io/writer)]
(pg/execute conn query {:to-json out}))
For more details, you’re welcome to the readme file of the repo.
Нашли ошибку? Выделите мышкой и нажмите Ctrl/⌘+Enter