1111
1212#include < pqxx/connection>
1313#include < pqxx/transaction>
14+ #include < pqxx/subtransaction.hxx>
1415#include < pqxx/nontransaction>
1516#include < pqxx/pipeline>
1617
@@ -70,7 +71,8 @@ struct config
7071 vector<string> connections;
7172 bool scatter;
7273 bool avoidDeadlocks;
73-
74+ bool subtransactions;
75+
7476 config () {
7577 nReaders = 1 ;
7678 nWriters = 10 ;
@@ -79,6 +81,7 @@ struct config
7981 updatePercent = 100 ;
8082 scatter = false ;
8183 avoidDeadlocks = false ;
84+ subtransactions = false ;
8285 }
8386};
8487
@@ -159,6 +162,33 @@ void* writer(void* arg)
159162 if (cfg.scatter ) {
160163 srcAcc = srcAcc/cfg.nWriters *cfg.nWriters + t.id ;
161164 dstAcc = dstAcc/cfg.nWriters *cfg.nWriters + t.id ;
165+ } else if (cfg.subtransactions ) {
166+ if (dstAcc < srcAcc) {
167+ int tmp = srcAcc;
168+ srcAcc = dstAcc;
169+ dstAcc = tmp;
170+ }
171+ while (true ) {
172+ try {
173+ subtransaction subtxn (txn, " withdraw" );
174+ exec (subtxn, " update t set v = v - 1 where u=%d" , srcAcc);
175+ break ;
176+ } catch (pqxx_exception const & x) {
177+ t.aborts += 1 ;
178+ }
179+ }
180+ while (true ) {
181+ try {
182+ subtransaction subtxn (txn, " deposit" );
183+ exec (subtxn, " update t set v = v + 1 where u=%d" , dstAcc);
184+ break ;
185+ } catch (pqxx_exception const & x) {
186+ t.aborts += 1 ;
187+ }
188+ }
189+ txn.commit ();
190+ t.transactions += 1 ;
191+ continue ;
162192 } else if (cfg.avoidDeadlocks ) {
163193 if (dstAcc < srcAcc) {
164194 int tmp = srcAcc;
@@ -198,8 +228,8 @@ void initializeDatabase()
198228 printf (" Creating database schema...\n " );
199229 {
200230 nontransaction txn (conn);
201- exec (txn, " drop extension if exists multimaster" );
202- exec (txn, " create extension multimaster" );
231+ // exec(txn, "drop extension if exists multimaster");
232+ // exec(txn, "create extension multimaster");
203233 exec (txn, " drop table if exists t" );
204234 exec (txn, " create table t(u int primary key, v int)" );
205235 }
@@ -251,6 +281,9 @@ int main (int argc, char* argv[])
251281 case ' d' :
252282 cfg.avoidDeadlocks = true ;
253283 continue ;
284+ case ' x' :
285+ cfg.subtransactions = true ;
286+ continue ;
254287 }
255288 }
256289 printf (" Options:\n "
@@ -260,7 +293,8 @@ int main (int argc, char* argv[])
260293 " \t -n N\t number of iterations (1000)\n "
261294 " \t -p N\t update percent (100)\n "
262295 " \t -c STR\t database connection string\n "
263- " \t -s\t scattern avoid deadlocks\n "
296+ " \t -s\t scatter ids to avoid conflicts\n "
297+ " \t -x\t use subtransactions\n "
264298 " \t -d\t avoid deadlocks\n "
265299 " \t -i\t initialize database\n " );
266300 return 1 ;
0 commit comments