1111import aioprocessing
1212import multiprocessing
1313import logging
14+ import re
1415
1516class MtmTxAggregate (object ):
1617
@@ -26,16 +27,19 @@ def clear_values(self):
2627 def start_tx (self ):
2728 self .start_time = datetime .datetime .now ()
2829
29- def finish_tx (self , name ):
30+ def finish_tx (self , status ):
3031 latency = (datetime .datetime .now () - self .start_time ).total_seconds ()
3132
33+ if "is aborted on node" in status :
34+ status = re .sub (r'MTM-.+\)' , '<censored>' , status )
35+
3236 if latency > self .max_latency :
3337 self .max_latency = latency
3438
35- if name not in self .finish :
36- self .finish [name ] = 1
39+ if status not in self .finish :
40+ self .finish [status ] = 1
3741 else :
38- self .finish [name ] += 1
42+ self .finish [status ] += 1
3943
4044 def as_dict (self ):
4145 return {
@@ -62,6 +66,7 @@ def __init__(self, dsns, n_accounts=100000):
6266 # logging.basicConfig(level=logging.DEBUG)
6367 self .n_accounts = n_accounts
6468 self .dsns = dsns
69+ self .total = 0
6570 self .aggregates = {}
6671 keep_trying (40 , 1 , self .initdb , 'self.initdb' )
6772 self .running = True
@@ -176,11 +181,11 @@ def exec_tx(self, tx_block, aggname_prefix, conn_i):
176181 # enable_hstore tries to perform select from database
177182 # which in case of select's failure will lead to exception
178183 # and stale connection to the database
179- conn = yield from aiopg .connect (dsn , enable_hstore = False , timeout = 3600 )
180- print ("reconnected" )
184+ conn = yield from aiopg .connect (dsn , enable_hstore = False , timeout = 1 )
185+ print ('Connected %s, %d' % ( aggname_prefix , conn_i + 1 ) )
181186
182187 if (not cur ) or cur .closed :
183- cur = yield from conn .cursor ()
188+ cur = yield from conn .cursor (timeout = 10 )
184189
185190 # ROLLBACK tx after previous exception.
186191 # Doing this here instead of except handler to stay inside try
@@ -198,17 +203,17 @@ def exec_tx(self, tx_block, aggname_prefix, conn_i):
198203 # Give evloop some free time.
199204 # In case of continuous excetions we can loop here without returning
200205 # back to event loop and block it
201- if "Multimaster node is not online" in msg :
202- yield from asyncio .sleep (1.00 )
203- else :
204- yield from asyncio .sleep (0.01 )
206+ yield from asyncio .sleep (0.5 )
207+
205208 except BaseException as e :
206- print ('Catch exception ' , type (e ))
207- agg .finish_tx (str (e ).strip ())
209+ msg = str (e ).strip ()
210+ agg .finish_tx (msg )
211+ print ('Caught exception %s, %s, %d, %s' % (type (e ), aggname_prefix , conn_i + 1 , msg ) )
212+
208213 # Give evloop some free time.
209214 # In case of continuous excetions we can loop here without returning
210215 # back to event loop and block it
211- yield from asyncio .sleep (0.01 )
216+ yield from asyncio .sleep (0.5 )
212217
213218 print ("We've count to infinity!" )
214219
@@ -235,10 +240,11 @@ def transfer_tx(self, conn, cur, agg):
235240 def total_tx (self , conn , cur , agg ):
236241 yield from cur .execute ('select sum(amount) from bank_test' )
237242 total = yield from cur .fetchone ()
238- if total [0 ] != 0 :
243+ if total [0 ] != self . total :
239244 agg .isolation += 1
240- # print(self.oops)s
241- # print('Isolation error, total = ', total[0])
245+ self .total = total [0 ]
246+ print (self .oops )
247+ print ('Isolation error, total = ' , total [0 ])
242248 # yield from cur.execute('select * from mtm.get_nodes_state()')
243249 # nodes_state = yield from cur.fetchall()
244250 # for i, col in enumerate(self.nodes_state_fields):
0 commit comments