2929import weakref
3030import pickle
3131import inspect
32- from io import StringIO
32+ import io
3333import contextlib
3434import gc
3535
3636# emit warnings about uncollectable objects
3737gc .set_debug (gc .DEBUG_UNCOLLECTABLE )
3838try :
39- long
39+ long # @UndefinedVariable
4040except NameError :
4141 long = int # @ReservedAssignment
4242
5050@contextlib .contextmanager
5151def captured_stderr ():
5252 old = sys .stderr
53- new = StringIO ()
53+ new = io . StringIO ()
5454 sys .stderr = new
5555 try :
5656 yield new
@@ -172,6 +172,77 @@ def __init__(cls, *args, **kw):
172172
173173class StacklessTestCase (unittest .TestCase , StacklessTestCaseMixin , metaclass = StacklessTestCaseMeta ):
174174
175+ @classmethod
176+ def prepare_test_method (cls , func , name ):
177+ """Called after class creation
178+
179+ This method creates the _H methods, which run without
180+ soft switching
181+ """
182+ if hasattr (func , "enable_softswitch" ) and not getattr (func , "_H_created" , False ):
183+ return ((func , name ), )
184+
185+ def wrapper_hardswitch (self , method = func ):
186+ self .assertTrue (self .__setup_called , "Broken test case: it didn't call super(..., self).setUp()" )
187+ self .assertFalse (stackless .enable_softswitch (None ), "softswitch is enabled" )
188+ return method (self )
189+ wrapper_hardswitch .enable_softswitch = False
190+ wrapper_hardswitch .__name__ = name + "_H"
191+ if func .__doc__ :
192+ doc = func .__doc__
193+ if doc .startswith ("(soft) " ):
194+ doc = doc [7 :]
195+ wrapper_hardswitch .__doc__ = "(hard) " + doc
196+ setattr (cls , wrapper_hardswitch .__name__ , wrapper_hardswitch )
197+
198+ if not hasattr (func , "_H_created" ):
199+ func ._H_created = True
200+ func .enable_softswitch = True
201+ if func .__doc__ :
202+ func .__doc__ = "(soft) " + func .__doc__
203+ return ((func , name ), (wrapper_hardswitch , wrapper_hardswitch .__name__ ))
204+
205+ @classmethod
206+ def prepare_pickle_test_method (cls , func , name = None ):
207+ """Called after class creation
208+
209+ This method creates the Py0...n C0...n methods, which run with
210+ the Python or C implementation of the enumerated pickle protocol.
211+
212+ This method also acts as a method decorator.
213+ """
214+ if name is None :
215+ # used as a decorator
216+ func .prepare = cls .prepare_pickle_test_method
217+ return func
218+
219+ if hasattr (func , "_pickle_created" ):
220+ return StacklessTestCase .prepare_test_method .__func__ (cls , func , name )
221+ setattr (cls , name , None )
222+ r = []
223+ for i in range (0 , pickle .HIGHEST_PROTOCOL + 1 ):
224+ for p_letter in ("C" , "P" ):
225+ def test (self , method = func , proto = i , pickle_module = p_letter , unpickle_module = p_letter ):
226+ self .assertTrue (self ._StacklessTestCase__setup_called , "Broken test case: it didn't call super(..., self).setUp()" )
227+ self ._pickle_protocol = proto
228+ self ._pickle_module = pickle_module
229+ self ._unpickle_module = unpickle_module
230+ return method (self )
231+ if i == 0 and p_letter == "C" :
232+ test .__name__ = name
233+ else :
234+ test .__name__ = "{:s}_{:s}{:d}" .format (name , p_letter , i )
235+ test ._pickle_created = True
236+ if func .__doc__ :
237+ doc = func .__doc__
238+ match = re .match (r"\([PC][0-{:d}]\)" .format (pickle .HIGHEST_PROTOCOL ), doc )
239+ if match :
240+ doc = match .string [match .end ():]
241+ test .__doc__ = "({:s}{:d}) {:s}" .format (p_letter , i , doc )
242+ setattr (cls , test .__name__ , test )
243+ r .extend (StacklessTestCase .prepare_test_method .__func__ (cls , test , test .__name__ ))
244+ return r
245+
175246 @classmethod
176247 def prepare_test_methods (cls ):
177248 """Called after class creation
@@ -183,29 +254,10 @@ def prepare_test_methods(cls):
183254 for n in names :
184255 m = getattr (cls , n )
185256 if inspect .ismethod (m ):
186- m = m .im_func
187- if hasattr (m , "enable_softswitch" ) and not getattr (m , "_H_created" , False ):
188- continue
189-
190- def wrapper_hardswitch (self , method = m ):
191- self .assertTrue (self .__setup_called , "Broken test case: it didn't call super(..., self).setUp()" )
192- self .assertFalse (stackless .enable_softswitch (None ), "softswitch is enabled" )
193- return method (self )
194- wrapper_hardswitch .enable_softswitch = False
195- wrapper_hardswitch .__name__ = n + "_H"
196- if m .__doc__ :
197- doc = m .__doc__
198- if doc .startswith ("(soft) " ):
199- doc = doc [7 :]
200- wrapper_hardswitch .__doc__ = "(hard) " + doc
201- setattr (cls , wrapper_hardswitch .__name__ , wrapper_hardswitch )
202-
203- if hasattr (m , "_H_created" ):
204- continue
205- m ._H_created = True
206- m .enable_softswitch = True
207- if m .__doc__ :
208- m .__doc__ = "(soft) " + m .__doc__
257+ m = m .__func__
258+ prepare = getattr (m , "prepare" , cls .prepare_test_method )
259+ for x in prepare .__func__ (cls , m , n ):
260+ pass
209261
210262 __setup_called = False
211263 __preexisting_threads = None
@@ -233,7 +285,7 @@ def setUpStacklessTestCase(self):
233285
234286 This method must be called from :meth:`setUp`.
235287 """
236- self .__setup_called = True
288+ self ._StacklessTestCase__setup_called = True
237289 self .addCleanup (stackless .enable_softswitch , stackless .enable_softswitch (self .__enable_softswitch ))
238290
239291 self .__active_test_cases [id (self )] = self
@@ -278,6 +330,20 @@ def tearDown(self):
278330 self .assertEqual (active_count , expected_thread_count , "Leakage from other threads, with %d threads running (%d expected)" % (active_count , expected_thread_count ))
279331 gc .collect () # emits warnings about uncollectable objects after each test
280332
333+ def dumps (self , obj , protocol = None , * , fix_imports = True ):
334+ if self ._pickle_module == "P" :
335+ return pickle ._dumps (obj , protocol = protocol , fix_imports = fix_imports )
336+ elif self ._pickle_module == "C" :
337+ return pickle .dumps (obj , protocol = protocol , fix_imports = fix_imports )
338+ raise ValueError ("Invalid pickle module" )
339+
340+ def loads (self , s , * , fix_imports = True , encoding = "ASCII" , errors = "strict" ):
341+ if self ._pickle_module == "P" :
342+ return pickle ._loads (s , fix_imports = fix_imports , encoding = encoding , errors = errors )
343+ elif self ._pickle_module == "C" :
344+ return pickle .loads (s , fix_imports = fix_imports , encoding = encoding , errors = errors )
345+ raise ValueError ("Invalid pickle module" )
346+
281347 # limited pickling support for test cases
282348 # Between setUp() and tearDown() the test-case has a
283349 # working __reduce__ method. Later the test case gets pickled by
@@ -331,6 +397,14 @@ def _addSkip(self, result, reason):
331397 del _tc
332398
333399
400+ class StacklessPickleTestCase (StacklessTestCase ):
401+ """A test case class for pickle tests"""
402+
403+ @classmethod
404+ def prepare_test_method (cls , func , name ):
405+ return cls .prepare_pickle_test_method (func , name )
406+
407+
334408def restore_testcase_from_id (id_ ):
335409 return StacklessTestCase .restore_testcase_from_id (id_ )
336410
0 commit comments