@@ -204,6 +204,11 @@ def create_pipeline(task):
204204 return pipe
205205
206206
207+ def remove_pipeline (task ):
208+ key = key_from_task (task )
209+ del __cache_transform_pipeline_by_task [key ]
210+
211+
207212def transform_using (pipeline , args , inputs ):
208213 args = orjson .loads (args )
209214 inputs = orjson .loads (inputs )
@@ -215,12 +220,16 @@ def transform_using(pipeline, args, inputs):
215220 return orjson .dumps (pipeline (inputs , ** args ), default = orjson_default ).decode ()
216221
217222
223+ def key_from_task (task ):
224+ return "," .join ([f"{ key } :{ val } " for (key , val ) in sorted (task .items ())])
225+
226+
218227def transform (task , args , inputs ):
219228 task = orjson .loads (task )
220229 args = orjson .loads (args )
221230 inputs = orjson .loads (inputs )
222231
223- key = "," . join ([ f" { key } : { val } " for ( key , val ) in sorted ( task . items ())] )
232+ key = key_from_task ( task )
224233 if key not in __cache_transform_pipeline_by_task :
225234 pipe = create_pipeline (task )
226235 __cache_transform_pipeline_by_task [key ] = pipe
0 commit comments