diff --git a/bytejta-core/src/main/java/org/bytesoft/bytejta/TransactionCoordinator.java b/bytejta-core/src/main/java/org/bytesoft/bytejta/TransactionCoordinator.java index de3bce7..2d39dda 100644 --- a/bytejta-core/src/main/java/org/bytesoft/bytejta/TransactionCoordinator.java +++ b/bytejta-core/src/main/java/org/bytesoft/bytejta/TransactionCoordinator.java @@ -36,6 +36,7 @@ import org.bytesoft.transaction.TransactionRepository; import org.bytesoft.transaction.archive.TransactionArchive; import org.bytesoft.transaction.aware.TransactionBeanFactoryAware; +import org.bytesoft.transaction.aware.TransactionEndpointAware; import org.bytesoft.transaction.internal.TransactionException; import org.bytesoft.transaction.logging.TransactionLogger; import org.bytesoft.transaction.xa.TransactionXid; @@ -43,15 +44,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TransactionCoordinator implements RemoteCoordinator, TransactionBeanFactoryAware { +public class TransactionCoordinator implements RemoteCoordinator, TransactionBeanFactoryAware, TransactionEndpointAware { static final Logger logger = LoggerFactory.getLogger(TransactionCoordinator.class.getSimpleName()); + private String endpoint; private TransactionBeanFactory beanFactory; - public String getIdentifier() { - throw new IllegalStateException(); - } - public Transaction getTransactionQuietly() { TransactionManager transactionManager = this.beanFactory.getTransactionManager(); return transactionManager.getTransactionQuietly(); @@ -480,6 +478,14 @@ public boolean setTransactionTimeout(int seconds) throws XAException { return false; } + public void setEndpoint(String identifier) { + this.endpoint = identifier; + } + + public String getIdentifier() { + return this.endpoint; + } + public void setBeanFactory(TransactionBeanFactory tbf) { this.beanFactory = tbf; } diff --git a/bytejta-core/src/main/java/org/bytesoft/bytejta/TransactionManagerImpl.java b/bytejta-core/src/main/java/org/bytesoft/bytejta/TransactionManagerImpl.java index 2b3c2f1..b8a6f99 100644 --- a/bytejta-core/src/main/java/org/bytesoft/bytejta/TransactionManagerImpl.java +++ b/bytejta-core/src/main/java/org/bytesoft/bytejta/TransactionManagerImpl.java @@ -58,15 +58,19 @@ public void begin() throws NotSupportedException, SystemException { throw new NotSupportedException(); } + XidFactory xidFactory = this.beanFactory.getXidFactory(); + RemoteCoordinator transactionCoordinator = this.beanFactory.getTransactionCoordinator(); + int timeoutSeconds = this.timeoutSeconds; TransactionContext transactionContext = new TransactionContext(); + transactionContext.setPropagatedBy(transactionCoordinator.getIdentifier()); transactionContext.setCoordinator(true); long createdTime = System.currentTimeMillis(); long expiredTime = createdTime + (timeoutSeconds * 1000L); transactionContext.setCreatedTime(createdTime); transactionContext.setExpiredTime(expiredTime); - XidFactory xidFactory = this.beanFactory.getXidFactory(); + TransactionXid globalXid = xidFactory.createGlobalXid(); transactionContext.setXid(globalXid); @@ -197,8 +201,8 @@ public Transaction suspend() throws SystemException { return transaction; } - public void resume(javax.transaction.Transaction tobj) throws InvalidTransactionException, IllegalStateException, - SystemException { + public void resume(javax.transaction.Transaction tobj) + throws InvalidTransactionException, IllegalStateException, SystemException { if (TransactionImpl.class.isInstance(tobj) == false) { throw new InvalidTransactionException(); @@ -270,8 +274,8 @@ public void timingExecution() { if (expired <= current) { expiredTransactions.add(transaction); } - }// end-if (transaction.isTiming()) - }// end-synchronized + } // end-if (transaction.isTiming()) + } // end-synchronized } Iterator expiredItr = expiredTransactions.iterator(); @@ -288,8 +292,8 @@ public void timingExecution() { } catch (Exception ex) { transactionRepository.putErrorTransaction(globalXid, transaction); } - }// end-else - }// end-while + } // end-else + } // end-while } public void stopTiming(Transaction tx) { diff --git a/bytejta-core/src/main/java/org/bytesoft/bytejta/TransactionRecoveryImpl.java b/bytejta-core/src/main/java/org/bytesoft/bytejta/TransactionRecoveryImpl.java index 9e81e33..b3af776 100644 --- a/bytejta-core/src/main/java/org/bytesoft/bytejta/TransactionRecoveryImpl.java +++ b/bytejta-core/src/main/java/org/bytesoft/bytejta/TransactionRecoveryImpl.java @@ -148,6 +148,7 @@ private TransactionImpl reconstructTransaction(TransactionArchive archive) throw transactionContext.setXid(xidFactory.createGlobalXid(xid.getGlobalTransactionId())); transactionContext.setRecoveried(true); transactionContext.setCoordinator(archive.isCoordinator()); + transactionContext.setPropagatedBy(archive.getPropagatedBy()); TransactionImpl transaction = new TransactionImpl(transactionContext); transaction.setBeanFactory(this.beanFactory); diff --git a/bytejta-supports/src/main/java/org/bytesoft/bytejta/supports/spring/TransactionEndpointPostProcessor.java b/bytejta-supports/src/main/java/org/bytesoft/bytejta/supports/spring/TransactionEndpointPostProcessor.java new file mode 100644 index 0000000..8c45773 --- /dev/null +++ b/bytejta-supports/src/main/java/org/bytesoft/bytejta/supports/spring/TransactionEndpointPostProcessor.java @@ -0,0 +1,114 @@ +/** + * Copyright 2014-2016 yangming.liu. + * + * This copyrighted material is made available to anyone wishing to use, modify, + * copy, or redistribute it subject to the terms and conditions of the GNU + * Lesser General Public License, as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this distribution; if not, see . + */ +package org.bytesoft.bytejta.supports.spring; + +import java.util.ArrayList; +import java.util.List; + +import org.bytesoft.common.utils.CommonUtils; +import org.bytesoft.transaction.TransactionBeanFactory; +import org.bytesoft.transaction.aware.TransactionBeanFactoryAware; +import org.bytesoft.transaction.aware.TransactionEndpointAware; +import org.springframework.beans.BeansException; +import org.springframework.beans.FatalBeanException; +import org.springframework.beans.MutablePropertyValues; +import org.springframework.beans.PropertyValue; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.config.BeanFactoryPostProcessor; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; + +import com.alibaba.dubbo.config.ProtocolConfig; + +public class TransactionEndpointPostProcessor implements BeanFactoryPostProcessor, TransactionBeanFactoryAware { + private TransactionBeanFactory beanFactory; + + public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + + BeanDefinition protocolDef = null; + + List beanDefList = new ArrayList(); + String[] beanNameArray = beanFactory.getBeanDefinitionNames(); + for (int i = 0; i < beanNameArray.length; i++) { + String beanName = beanNameArray[i]; + BeanDefinition beanDef = beanFactory.getBeanDefinition(beanName); + String beanClassName = beanDef.getBeanClassName(); + + Class beanClass = null; + try { + beanClass = cl.loadClass(beanClassName); + } catch (Exception ex) { + continue; + } + + if (TransactionEndpointAware.class.isAssignableFrom(beanClass)) { + beanDefList.add(beanDef); + } else if (ProtocolConfig.class.isAssignableFrom(beanClass)) { + if (protocolDef == null) { + protocolDef = beanDef; + } else { + throw new FatalBeanException("There are more than one com.alibaba.dubbo.config.ProtocolConfig was found!"); + } + } + } + + if (protocolDef == null) { + throw new FatalBeanException("No configuration of class com.alibaba.dubbo.config.ProtocolConfig was found."); + } + + MutablePropertyValues protocolValues = protocolDef.getPropertyValues(); + PropertyValue protocolValue = protocolValues.getPropertyValue("port"); + if (protocolValue == null || protocolValue.getValue() == null) { + throw new FatalBeanException("Attribute 'port' of is null."); + } + + String host = CommonUtils.getInetAddress(); + String port = String.valueOf(protocolValue.getValue()); + String identifier = String.format("%s:%s", host, port); + + for (int i = 0; i < beanDefList.size(); i++) { + BeanDefinition beanDef = beanDefList.get(i); + MutablePropertyValues mpv = beanDef.getPropertyValues(); + mpv.addPropertyValue(TransactionEndpointAware.ENDPOINT_FIELD_NAME, identifier); + } + + } + + public void initializeCoordinator(ConfigurableListableBeanFactory beanFactory, BeanDefinition protocolDef, + String compensableBeanId) throws BeansException { + MutablePropertyValues mpv = protocolDef.getPropertyValues(); + PropertyValue pv = mpv.getPropertyValue("port"); + if (pv == null || pv.getValue() == null) { + throw new FatalBeanException("Attribute 'port' of is null."); + } + + String host = CommonUtils.getInetAddress(); + String port = String.valueOf(pv.getValue()); + String identifier = String.format("%s:%s", host, port); + + BeanDefinition beanDef = beanFactory.getBeanDefinition(compensableBeanId); + beanDef.getPropertyValues().addPropertyValue("identifier", identifier); + } + + public TransactionBeanFactory getBeanFactory() { + return beanFactory; + } + + public void setBeanFactory(TransactionBeanFactory beanFactory) { + this.beanFactory = beanFactory; + } + +} diff --git a/bytejta-supports/src/main/resources/bytejta.xml b/bytejta-supports/src/main/resources/bytejta.xml index e65ed8d..59ba672 100644 --- a/bytejta-supports/src/main/resources/bytejta.xml +++ b/bytejta-supports/src/main/resources/bytejta.xml @@ -13,8 +13,9 @@ The bytejta transaction manager module - + +