php小编西瓜在使用spring batch时,可能会遇到一个问题:当尝试同时启动两个或多个spring batch作业时,会抛出错误:ora-08177: 无法序列化此事务的访问。这个错误可能让人困惑,但实际上它是由于数据库锁定问题导致的。在解决这个问题之前,我们需要了解一些关于spring batch和数据库事务的背景知识。
问题内容
尝试使用 completablefuture 并行运行两个 spring batch 作业时遇到错误。错误信息如下:
originalsql = insert into batch_job_instance(job_instance_id, job_name, job_key, version) values (?, ?, ?, ?), error msg = ora-08177: can't serialize access for this transaction
登录后复制
我正在使用 spring batch 5.x、spring boot 3.1.6、jdk 17
应用程序属性
spring.batch.repository.isolationlevelforcreate=isolation_read_committedspring.batch.isolationlevel=read_committedspring.batch.jdbc.table-prefix=batch_spring.batch.job.enabled=false
登录后复制
batchconfig.java
@configurationpublic class batchconfig {@bean("simpletaskexecutor") public taskexecutor simpletaskexecutor() { simpleasynctaskexecutor asynctaskexecutor = new simpleasynctaskexecutor("simpletaskexecutor"); asynctaskexecutor.setconcurrencylimit(concurrencycount); return asynctaskexecutor; }@bean("joblauncherasync") @scope("prototype") public joblauncher joblauncherasync(datasource datasource, jobrepository jobrepository) throws exception { taskexecutorjoblauncher joblauncher = new taskexecutorjoblauncher(); joblauncher.setjobrepository(jobrepository); joblauncher.settaskexecutor(simpletaskexecutor()); joblauncher.afterpropertiesset(); return joblauncher; } @bean public jpatransactionmanager transactionmanager(entitymanagerfactory entitymanagerfactory) { return new jpatransactionmanager(entitymanagerfactory); } @bean public batchjobexecutionlistener batchjobexecutionlistener() { return new batchjobexecutionlistener(); } @bean public batchjobstepexecutionlistner batchjobstepexecutionlistner() { return new batchjobstepexecutionlistner(); }
登录后复制
}
employeejobconfig.java
@configuration@import(batchconfig.class)public class employeejobconfig{ @autowired employeestatuswritter employeestatuswritter; @autowired employeependingprocessor employeependingprocessor; @bean("employeependingreader") public jpapagingitemreader employeependingreader(datasource ds,entitymanagerfactory entitymanagerfactory) { jpapagingitemreader jpareader = new jpapagingitemreader(); jpareader.setentitymanagerfactory(entitymanagerfactory); jpareader.setquerystring("select e from employee e"); jpareader.setpagesize(5000); return jpareader; } @bean("employeespinvokestep") public step employeespinvokestep(@qualifier("employeependingreader") itemreader reader, @qualifier("employeestatuswritter") itemwriter writer, @qualifier("employeependingprocessor") itemprocessor processor,jobrepository jobrepository, platformtransactionmanager transactionmanager,batchjobstepexecutionlistner batchjobstepexecutionlistner,taskexecutor simpletaskexecutor) { return new stepbuilder("employeespinvokestep",jobrepository) .chunk(50,transactionmanager).reader(reader) .processor(processor).writer(writer) .listener(batchjobstepexecutionlistner) .taskexecutor(simpletaskexecutor) .build(); } @bean("employeespjob") public job employeespjob(@qualifier("employeespinvokestep") step employeespinvokestep,jobrepository jobrepository,batchjobexecutionlistener batchjobexecutionlistener) { return new jobbuilder("employeespjob",jobrepository) .incrementer(new runidincrementer()) .listener(batchjobexecutionlistener) .start(employeespinvokestep) .build(); }}
登录后复制
managerconfig.java
@configuration@import(batchconfig.class)public class managerconfig { @autowired managerstatuswritter managerstatuswritter; @autowired managerpendingprocessor managerpendingprocessor; @bean("managerpendingreader") public jpapagingitemreader managerpendingreader(datasource ds,entitymanagerfactory entitymanagerfactory) { jpapagingitemreader jpareader = new jpapagingitemreader(); jpareader.setentitymanagerfactory(entitymanagerfactory); jpareader.setquerystring("select m from manager m"); jpareader.setpagesize(5000); return jpareader; } @bean("managerspinvokestep") public step indvinvoiceconsctlspinvokestep(@qualifier("managerpendingreader") itemreader reader, @qualifier("managerstatuswritter") itemwriter writer, @qualifier("managerpendingprocessor") itemprocessor processor,jobrepository jobrepository, platformtransactionmanager transactionmanager,batchjobstepexecutionlistner batchjobstepexecutionlistner,taskexecutor simpletaskexecutor) { return new stepbuilder("managerspinvokestep",jobrepository) .chunk(5000,transactionmanager).reader(reader) .processor(processor).writer(writer) .listener(batchjobstepexecutionlistner) .taskexecutor(simpletaskexecutor) .build(); } @bean("managerspjob") public job managerspjob(@qualifier("managerspinvokestep") step indvinvoiceconsctlspinvokestep,jobrepository jobrepository,batchjobexecutionlistener batchjobexecutionlistener) { return new jobbuilder("managerspjob",jobrepository) .incrementer(new runidincrementer()) .listener(batchjobexecutionlistener) .start(indvinvoiceconsctlspinvokestep) .build(); } }
登录后复制
batchjobmanager.java
@servicepublic class batchjobmanager { @autowired applicationcontext context; @autowired batchexecutorservice batchexecutorservice; @autowired batchjobrunner batchjobrunner; public void startjob() { try { system.out.println("batchjobmanager called .. "+new date()); string[] invoicenames={"employeespjob","managerspjob"}; list invoicenameslist = arrays.aslist(invoicenames); launchasyn(getbatchjoblist(invoicenameslist)); } catch (exception e) { system.out.println("while loading job.."); e.printstacktrace(); } } public list getbatchjoblist(list jobnames) throws exception{ list batchjoblist=new arraylist(); for(string job:jobnames) { batchjob batchjob= batchjob.builder().jobname(invoicejob).build(); batchjoblist.add(batchjob); } return batchjoblist; } public void launchasyn( list batchjoblist) throws exception{ list> batchjobfuturelist = new arraylist>(); for(batchjob batchjob:batchjoblist) { completablefuture jobfuture = batchexecutorservice.execute(batchjob, asynctaskexecutor); batchjobfuturelist.add(jobfuture); } completablefuture jobfutureresult = completablefuture .allof(batchjobfuturelist.toarray(new completablefuture[batchjobfuturelist.size()])); completablefuture> allcompletablefuture = jobfutureresult.thenapply(future -> { return batchjobfuturelist.stream().map(completablefuture -> completablefuture.join()) .collect(collectors.tolist()); }); list resultfuturelist=allcompletablefuture.get(); for(batchjob batch:resultfuturelist) { system.out.println("status "+batch.getiscompleted()); } } }
登录后复制
batchexecutorservice.java
@servicepublic class batchexecutorservice { @autowired batchjobrunner batchjobrunner; public completablefuture execute(canbatchjob canbatchjob,taskexecutor threadpooltaskexecutor){ return completablefuture.supplyasync(() -> batchjobrunner.execute(canbatchjob),threadpooltaskexecutor); } }
登录后复制
batchjobrunner.java
@servicepublic class batchjobrunner { @autowired applicationcontext context; @autowired @qualifier("joblauncherasync") joblauncher joblauncherasync; /* * @autowired joblauncher joblauncherasync; */ public batchjob execute(batchjob batchjob) { try { system.out.println(" batchjob"+batchjob.getjobname()+" called ..."); joblauncherasync.run(getjob(batchjob.getjobname()), getjobparameters(batchjob.getjobname())); thread.sleep(15000); batchjob.setiscompleted(true); system.out.println(" batchjob"+batchjob.getjobname()+" completed ..."); } catch(exception e) { system.out.println("exception "+e.getmessage()); batchjob.seterrordesc(e.getmessage().tostring()); e.printstacktrace(); } return canbatchjob; } public job getjob(string jobname) { return (job) context.getbean(jobname); } public jobparameters getjobparameters(string jobname) { jobparameters jobparameters = new jobparametersbuilder() .addstring("unique_id", uuid.randomuuid().tostring(), true) .addstring("job_name", jobname, true) .adddate("execution_start_date", date.from(instant.now()), true).tojobparameters(); return jobparameters; } }
登录后复制
batchjob.java
public class BatchJob { private String jobName; private Boolean isCompleted; private String errorDesc; }
登录后复制
作业在逐一或按顺序执行时成功运行。但是,在使用 completablefuture 时,遇到了问题。同时启动 spring 批处理作业是正确的方法吗?
解决方法
1.将以下属性添加到application.properties
spring.main.allow-bean-definition-overriding=true
2.更新了 joblauncher(),如下 batchconfig.java 中所示
@bean("joblauncher") public joblauncher joblauncher(datasource datasource, jobrepository jobrepository) throws exception { taskexecutorjoblauncher joblauncher = new taskexecutorjoblauncher(); joblauncher.setjobrepository(jobrepository); joblauncher.settaskexecutor(simpletaskexecutor()); joblauncher.afterpropertiesset(); return joblauncher; }
登录后复制
删除了 completablefuture。
删除了 batchexecutorservice.java
在 batchjobmanager.java 中添加了以下方法来调用作业。
public void launchSync(List batchJobList) throws Exception { for(BatchJob batchJob:batchJobList) { batchJobRunner.execute(batchJob); } }
登录后复制
以上就是尝试同时启动两个或多个 Spring Batch 作业时,会抛出错误:ORA-08177: 无法序列化此事务的访问的详细内容,更多请关注【创想鸟】其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。
发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2621968.html