PG2 version 0.1.15 is out. This version mostly ships improvements to connection pool and folders (reducers) of a database result. There are two new sections in the documentation that describe each part. I reproduce them below.

Connection Pool

Problem: every time you connect to the database, it takes time to open a socket, pass authentication pipeline and receive initial data from the server. From the server’s prospective, a new connection spawns a new process which is also an expensive operation. If you open a connection per a query, your application is about ten times slower than it could be.

Connection pools solve that problem. A pool holds a set of connections opened in advance, and you borrow them from a pool. When borrowed, a connection cannot be shared with somebody else any longer. Once you’ve done with your work, you return the connection to the pool, and it’s available for other consumers.

PG2 ships a simple and robust connection pool out from the box. This section covers how to use it.

A Simple Example

Import both core and pool namespaces as follows:

(ns demo
  (:require
    [pg.core :as pg]
    [pg.pool :as pool]))

Here is how you use the pool:

(def config
  {:host "127.0.0.1"
   :port 5432
   :user "test"
   :password "test"
   :database "test"})

(pool/with-pool [pool config]
  (pool/with-connection [conn pool]
    (pg/execute conn "select 1 as one")))

The pool/with-pool macro creates a pool object from the config map and binds it to the pool symbol. Once you exit the macro, the pool gets closed.

The with-pool macro can be easily replaced with the with-open macro and the pool function that creates a pool instance. By exit, the macro calls the .close method of an opened object, which closes the pool.

(with-open [pool (pool/pool config)]
  (pool/with-conn [conn pool]
    (pg/execute conn "select 1 as one")))

Having a pool object, use it with the pool/with-connection macro (there is a shorter version pool/with-conn as well). This macro borrows a connection from the pool and binds it to the conn symbol. Now you pass the connection to pg/execute, pg/query and so on. By exiting the with-connection macro, the connection is returned to the pool.

And this is briefly everything you need to know about the pool! Sections below describe more about its inner state and behavior.

Configuration

The pool object accepts the same config the Connection object does section for the table of parameters). In addition to these, the fillowing options are accepted:

Field Type Default Comment
:pool-min-size integer 2 Minimum number of open connections when initialized.
:pool-max-size integer 8 Maximum number of open connections. Cannot be exceeded.
:pool-expire-threshold-ms integer 300.000 (5 mins) How soon a connection is treated as expired and will be forcibly closed.
:pool-borrow-conn-timeout-ms integer 15.000 (15 secs) How long to wait when borrowing a connection while all the connections are busy. By timeout, an exception is thrown.

The first option :pool-min-size specifies how many connection are opened at the beginning. Setting too many is not necessary because you never know if you application will really use all of them. It’s better to start with a small number and let the pool to grow in time, if needed.

The next option :pool-max-size determines the total number of open connections. When set, it cannot be overridden. If all the connections are busy and there is still a gap, the pool spawns a new connection and adds it to the internal queue. But if the :pool-max-size value is reached, an exception is thrown.

The option :pool-expire-threshold-ms specifies the number of milliseconds. When a certain amount of time has passed since the connection’s initialization, it is considered expired and will be closed by the pool. This is used to rotate connections and prevent them from living for too long.

The option :pool-borrow-conn-timeout-ms prescribes how long to wait when borrowing a connection from an exhausted pool: a pool where all the connections are busy and the :pool-max-size value is reached. At this case, the only hope that other clients complete their work and return theri connection before timeout bangs. Should there still haven’t been any free connections during the :pool-borrow-conn-timeout-ms time window, an exception pops up.

Pool Methods

The stats function returns info about free and used connections:

(pool/with-pool [pool config]

  (pool/stats pool)
  ;; {:free 1 :used 0}

  (pool/with-connection [conn pool]
    (pool/stats pool)
    ;; {:free 0 :used 1}
  ))

It might be used to send metrics to Grafana, CloudWatch, etc.

Manual Pool Management

The following functions help you manage a connection pool manually, for example when it’s wrapped into a component (see Component and Integrant libraries).

The pool function creates a pool:

(def POOL (pool/pool config))

The used-count and free-count functions return total numbers of busy and free connections, respectively:

(pool/free-count POOL)
;; 2

(pool/used-count POOL)
;; 0

The pool? predicate ensures it’s a Pool instance indeed:

(pool/pool? POOL)
;; true

Closing

The close method shuts down a pool instance. On shutdown, first, all the free connections get closed. Then the pool closes busy connections that were borrowed. This might lead to failures in other threads, so it’s worth waiting until the pool has zero busy connections.

(pool/close POOL)
;; nil

The closed? predicate ensures the pool has already been closed:

(pool/closed? POOL)
;; true

Borrow Logic in Detail

When getting a connection from a pool, the following conditions are taken into account:

  • if the pool is closed, an exception is thrown;
  • if there are free connections available, the pool takes one of them;
  • if a connection is expired (was created long ago), it’s closed and the pool performs another attempt;
  • if there aren’t free connections, but the max number of used connection has not been reached yet, the pool spawns a new connection;
  • if the number of used connections is reached, the pool waits for :pool-borrow-conn-timeout-ms amount of milliseconds hoping that someone releases a connection in the background;
  • by timeout (when nobody did), the pool throws an exception.

Returning Logic in Detail

When you return a connection to a pool, the following cases might come into play:

  • if the connection is an error state, then transaction is rolled back, and the connection is closed;
  • if the connection is in transaction mode, it is rolled back, and the connection is marked as free again;
  • if it was already closed, the pool just removes it from used connections. It won’t be added into the free queue;
  • if the pool is closed, the connection is removed from used connections;
  • when none of above conditions is met, the connection is removed from used and becomes available for other consumers again.

This was the Connecton Pool section, and now we proceed with Folders.

Folders (Reducers)

Folders (which are also known as reducers) are objects that transform rows from network into something else. A typical folder consists from an initial value (which might be mutable) and logic that adds the next row to that value. Before returning the value, a folder might post-process it somehow, for example turn it into an immutable value.

The default folder (which you don’t need to specify) acts exactly like this: it spawns a new transient vector and conj!es all the incoming rows into it. Finally, it returns a persistent! version of this vector.

PG2 provides a great variety of folders: to build maps or sets, to index or group rows by a certain function. With folders, it’s possible to dump a database result into a JSON or EDN file.

It’s quite important that folders process rows on the fly. Like transducers, they don’t keep the whole dataset in memory. They only track the accumulator and the current row no matter how many of them have arrived from the database: one thousand or one million.

A Simple Folder

Technically a folder is a function (an instance of clojure.lang.IFn) with three bodies of arity 0, 1, and 2, as follows:

(defn a-folder
  ([]
   ...)
  ([acc]
   ...)
  ([acc row]
   ...))
  • The first 0-arity form produces an accumulator that might be mutable.

  • The third 2-arity form takes the accumulator and the current row and returns an updated version of the accumulator.

  • The second 1-arity form accepts the last version of the accumulator and transforms it somehow, for example seals a transient collection into its persistent view.

Here is the default folder:

(defn default
  ([]
   (transient []))
  ([acc!]
   (persistent! acc!))
  ([acc! row]
   (conj! acc! row)))

Some folders depend on initial settings and thus produce folding functions. Here is an example of the map folder that acts like the map function from clojure.core:

(defn map
  [f]
  (fn folder-map
    ([]
     (transient []))
    ([acc!]
     (persistent! acc!))
    ([acc! row]
     (conj! acc! (f row)))))

Passing A Folder

To pass a custom folder to process the result, specify the :as key as follows:

(require '[pg.fold :as fold])

(defn row-sum [{:keys [field_1 field_2]}]
  (+ field_1 field_2))

(pg/execute conn query {:as (fold/map row-sum)})

;; [10 53 14 32 ...]

Standard Folders and Aliases

PG provides a number of built-in folders. Some of them are used so often that it’s not needed to pass them explicitly. There are shortcuts that enable certain folders internally. Below, find the actual list of folders, their shortcuts and examples.

Column

Takes a single column from each row returning a plain vector:

(pg/execute conn query {:as (fold/column :id)})

;; [1 2 3 4 ....]

There is an alias :column that accepts a name of the column:

(pg/execute conn query {:column :id})
;; [1 2 3 4 ....]

Map

Acts like the standard map function from clojure.core. Applies a function to each row and collects a vector of results.

Passing the folder explicitly:

(pg/execute conn query {:as (fold/map func)})

And with an alias:

(pg/execute conn query {:map func})

Default

Collects unmodified rows into a vector. That’s unlikely you’ll need that folder as it gets applied internally when no other folders were specified.

Dummy

A folder that doesn’t accumulate the rows but just skips them and returns nil.

(pg/execute conn query {:as fold/dummy})

nil

First

Perhaps the most needed folder, first returns the first row only and skips the rest. Pay attention, this folder doesn’t have a state and thus doesn’t need to be initiated. Useful when you query a single row by its primary key:

(pg/execute conn
            "select * from users where id = $1"
            {:params [42]
             :as fold/first})

{:id 42 :email "test@test.com"}

Or pass the :first (or :first?) option set to true:

(pg/execute conn
            "select * from users where id = $1"
            {:params [42]
             :first true})

{:id 42 :email "test@test.com"}

Index by

Often, you select rows as a vector and build a map like {id => row}, for example:

(let [rows (jdbc/execute! conn ["select ..."])]
  (reduce (fn [acc row]
            (assoc acc (:id row) row))
          {}
          rows))

{1 {:id 1 :name "test1" ...}
 2 {:id 2 :name "test2" ...}
 3 {:id 3 :name "test3" ...}
 ...
 }

This process is known as indexing because later on, the map is used as an index for quick lookups.

This approach, although is quite common, has flaws. First, you traverse rows twice: when fetching them from the database, and then again inside reduce. Second, it takes extra lines of code.

The index-by folder does exactly the same: it accepts a function which is applied to a row and uses the result as an index key. Most often you pass a keyword:

(let [query
      "with foo (a, b) as (values (1, 2), (3, 4), (5, 6))
      select * from foo"

      res
      (pg/execute conn query {:as (fold/index-by :a)})]

{1 {:a 1 :b 2}
 3 {:a 3 :b 4}
 5 {:a 5 :b 6}})

The shortcut :index-by accepts a function as well:

(pg/execute conn query {:index-by :a})

Group by

The group-by folder is simlar to index-by but collects multiple rows per a grouping function. It produces a map like {(f row) => [row1, row2, ...]} where row1, row2 and the rest return the same value for f.

Imagine each user in the database has a role:

{:id 1 :name "Test1" :role "user"}
{:id 2 :name "Test2" :role "user"}
{:id 3 :name "Test3" :role "admin"}
{:id 4 :name "Test4" :role "owner"}
{:id 5 :name "Test5" :role "admin"}

This is what group-by returns when grouping by the :role field:

(pg/execute conn query {:as (fold/group-by :role)})

{"user"
 [{:id 1, :name "Test1", :role "user"}
  {:id 2, :name "Test2", :role "user"}]

 "admin"
 [{:id 3, :name "Test3", :role "admin"}
  {:id 5, :name "Test5", :role "admin"}]

 "owner"
 [{:id 4, :name "Test4", :role "owner"}]}

The folder has its own alias which accepts a function:

(pg/execute conn query {:group-by :role})

KV (Key and Value)

The kv folder accepts two functions: the first one is for a key (fk), and the second is for a value (fv). Then it produces a map like {(fk row) => (fv row)}.

A typical example might be a narrower index map. Imagine you select just a couple of fields, id and email. Now you need a map of {id => email} for quick email lookup by id. This is where kv does the job for you.

(pg/execute conn
            "select id, email from users"
            {:as (fold/kv :id :email)})

{1 "ivan@test.com"
 2 "hello@gmail.com"
 3 "skotobaza@mail.ru"}

The :kv alias accepts a vector of two functions:

(pg/execute conn
            "select id, email from users"
            {:kv [:id :email]})

Run

The run folder is useful for processing rows with side effects, e.g. printing them, writing to files, passing via API. A one-argument function passed to run is applied to each row ignoring the result. The folder counts a total number of rows being processed.

(defn func [row]
  (println "processing row" row)
  (send-to-api row))

(pg/execute conn query {:as (fold/run func)})

100 ;; the number of rows processed

An example with an alias:

(pg/execute conn query {:run func})

Table

The table folder returns a plain matrix (a vector of vectors) of database values. It reminds the columns folder but also keeps column names in the leading row. Thus, the resulting table always has at least one row (it’s never empty because of the header). The table view is useful when saving the data into CSV.

The folder has its inner state and thus needs to be initialized with no parameters:

(pg/execute conn query {:as (fold/table)})

[[:id :email]
 [1 "ivan@test.com"]
 [2 "skotobaza@mail.ru"]]

The alias :table accepts any non-false value:

(pg/execute conn query {:table true})

[[:id :email]
 [1 "ivan@test.com"]
 [2 "skotobaza@mail.ru"]]

Java

This folder produces java.util.ArrayList where each row is an instance of java.util.HashMap. It doesn’t require initialization:

(pg/execute conn query {:as fold/java})

Alias:

(pg/execute conn query {:java true})

Reduce

The reduce folder acts like the same-name function from clojure.core. It accepts a function and an initial value (accumulator). The function accepts the accumulator and the current row, and returns an updated version of the accumulator.

Here is how you collect unique pairs of size and color from the database result:

(defn ->pair [acc {:keys [sku color]}]
  (conj acc [a b]))

(pg/execute conn query {:as (fold/reduce ->pair #{})})

#{[:xxl :green]
  [:xxl :red]
  [:x :red]
  [:x :blue]}

The folder ignores reduced logic: it performs iteration until all rows are consumed. It doesn’t check if the accumulator is wrapped with reduced.

The :reduce alias accepts a vector of a function and an initial value:

(pg/execute conn query {:reduce [->pair #{}]})

Into (Transduce)

This folder mimics the into logic when it deals with an xform, also known as a transducer. Sometimes, you need to pass the result throughout a bunch of map/filter/keep functions. Each of them produces an intermediate collection which is not as fast as it could be with a transducer. Transducers are designed such that they compose a stack of actions, which, when being run, does not produce extra collections.

The into folder accepts an xform produced by map/filter/comp, whatever. It also accepts a persistent collection which acts as an accumulator. The accumulator gets transformed into a transient view internally for better performance. The folder uses conj! to push values into the accumulator, so maps are not acceptable, only vectors, lists, or sets. When the accumulator is not passed, it’s an empty vector.

Here is a quick example of into in action:

(let [tx
      (comp (map :a)
            (filter #{1 5})
            (map str))

      query
      "with foo (a, b) as (values (1, 2), (3, 4), (5, 6))
       select * from foo"]

  (pg/execute conn query {:as (fold/into tx)}))

;; ["1" "5"]

Another case where we pass a non-empty set to collect the values:

(pg/execute conn query {:as (fold/into tx #{:a :b :c})})

;; #{:a :b :c "1" "5"}

The :into alias is a vector where the first item is an xform and the second is an accumulator:

(pg/execute conn query {:into [tx []]})

To EDN

This folder writes down rows into an EDN file. It accepts an instance of java.io.Writer which must be opened in advance. The folder doesn’t open nor close the writer as these actions are beyond its scope. A common pattern is to wrap pg/execute or pg/query invocations with the with-open macro that handles closing procedure even in case of an exception.

The folder writes down rows into the writer using pr-str. Each row takes one line, and the lines are split with \n. The leading line is [, and the trailing is ].

The result is a number of rows processed. Here is an example of dumping rows into a file called “test.edn”:

(with-open [out (-> "test.edn" io/file io/writer)]
  (pg/execute conn query {:as (fold/to-edn out)}))

;; 199

Let’s check the content of the file:

[
  {:id 1 :email "test@test.com"}
  {:id 2 :email "hello@test.com"}
  ...
  {:id 199 :email "ivan@test.com"}
]

The alias :to-edn accepts a writer object:

(with-open [out (-> "test.edn" io/file io/writer)]
  (pg/execute conn query {:to-edn out}))

To JSON

Like to-edn but dumps rows into JSON. Accepts an instance of java.io.Writer. Writes rows line by line with no pretty printing. Lines are joined with a comma. The leading and trailing lines are square brackets. The result is the number of rows put into the writer.

(with-open [out (-> "test.json" io/file io/writer)]
  (pg/execute conn query {:as (fold/to-json out)}))

;; 123

The content of the file:

[
  {"b":2,"a":1},
  {"b":4,"a":3},
  // ...
  {"b":6,"a":5}
]

The :to-json alias accepts a writer object:

(with-open [out (-> "test.json" io/file io/writer)]
  (pg/execute conn query {:to-json out}))

For more details, you’re welcome to the readme file of the repo.