Skip to content

Commit

Permalink
* 新增TransactionEndpointAware接口支持
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyangming committed Dec 14, 2016
1 parent b3f3f77 commit 843fcba
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,20 @@
import org.bytesoft.transaction.TransactionRepository;
import org.bytesoft.transaction.archive.TransactionArchive;
import org.bytesoft.transaction.aware.TransactionBeanFactoryAware;
import org.bytesoft.transaction.aware.TransactionEndpointAware;
import org.bytesoft.transaction.internal.TransactionException;
import org.bytesoft.transaction.logging.TransactionLogger;
import org.bytesoft.transaction.xa.TransactionXid;
import org.bytesoft.transaction.xa.XidFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionCoordinator implements RemoteCoordinator, TransactionBeanFactoryAware {
public class TransactionCoordinator implements RemoteCoordinator, TransactionBeanFactoryAware, TransactionEndpointAware {
static final Logger logger = LoggerFactory.getLogger(TransactionCoordinator.class.getSimpleName());

private String endpoint;
private TransactionBeanFactory beanFactory;

public String getIdentifier() {
throw new IllegalStateException();
}

public Transaction getTransactionQuietly() {
TransactionManager transactionManager = this.beanFactory.getTransactionManager();
return transactionManager.getTransactionQuietly();
Expand Down Expand Up @@ -480,6 +478,14 @@ public boolean setTransactionTimeout(int seconds) throws XAException {
return false;
}

public void setEndpoint(String identifier) {
this.endpoint = identifier;
}

public String getIdentifier() {
return this.endpoint;
}

public void setBeanFactory(TransactionBeanFactory tbf) {
this.beanFactory = tbf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,19 @@ public void begin() throws NotSupportedException, SystemException {
throw new NotSupportedException();
}

XidFactory xidFactory = this.beanFactory.getXidFactory();
RemoteCoordinator transactionCoordinator = this.beanFactory.getTransactionCoordinator();

int timeoutSeconds = this.timeoutSeconds;

TransactionContext transactionContext = new TransactionContext();
transactionContext.setPropagatedBy(transactionCoordinator.getIdentifier());
transactionContext.setCoordinator(true);
long createdTime = System.currentTimeMillis();
long expiredTime = createdTime + (timeoutSeconds * 1000L);
transactionContext.setCreatedTime(createdTime);
transactionContext.setExpiredTime(expiredTime);
XidFactory xidFactory = this.beanFactory.getXidFactory();

TransactionXid globalXid = xidFactory.createGlobalXid();
transactionContext.setXid(globalXid);

Expand Down Expand Up @@ -197,8 +201,8 @@ public Transaction suspend() throws SystemException {
return transaction;
}

public void resume(javax.transaction.Transaction tobj) throws InvalidTransactionException, IllegalStateException,
SystemException {
public void resume(javax.transaction.Transaction tobj)
throws InvalidTransactionException, IllegalStateException, SystemException {

if (TransactionImpl.class.isInstance(tobj) == false) {
throw new InvalidTransactionException();
Expand Down Expand Up @@ -270,8 +274,8 @@ public void timingExecution() {
if (expired <= current) {
expiredTransactions.add(transaction);
}
}// end-if (transaction.isTiming())
}// end-synchronized
} // end-if (transaction.isTiming())
} // end-synchronized
}

Iterator<Transaction> expiredItr = expiredTransactions.iterator();
Expand All @@ -288,8 +292,8 @@ public void timingExecution() {
} catch (Exception ex) {
transactionRepository.putErrorTransaction(globalXid, transaction);
}
}// end-else
}// end-while
} // end-else
} // end-while
}

public void stopTiming(Transaction tx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ private TransactionImpl reconstructTransaction(TransactionArchive archive) throw
transactionContext.setXid(xidFactory.createGlobalXid(xid.getGlobalTransactionId()));
transactionContext.setRecoveried(true);
transactionContext.setCoordinator(archive.isCoordinator());
transactionContext.setPropagatedBy(archive.getPropagatedBy());

TransactionImpl transaction = new TransactionImpl(transactionContext);
transaction.setBeanFactory(this.beanFactory);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Copyright 2014-2016 yangming.liu<[email protected]>.
*
* This copyrighted material is made available to anyone wishing to use, modify,
* copy, or redistribute it subject to the terms and conditions of the GNU
* Lesser General Public License, as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this distribution; if not, see <http://www.gnu.org/licenses/>.
*/
package org.bytesoft.bytejta.supports.spring;

import java.util.ArrayList;
import java.util.List;

import org.bytesoft.common.utils.CommonUtils;
import org.bytesoft.transaction.TransactionBeanFactory;
import org.bytesoft.transaction.aware.TransactionBeanFactoryAware;
import org.bytesoft.transaction.aware.TransactionEndpointAware;
import org.springframework.beans.BeansException;
import org.springframework.beans.FatalBeanException;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.PropertyValue;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;

import com.alibaba.dubbo.config.ProtocolConfig;

public class TransactionEndpointPostProcessor implements BeanFactoryPostProcessor, TransactionBeanFactoryAware {
private TransactionBeanFactory beanFactory;

public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
ClassLoader cl = Thread.currentThread().getContextClassLoader();

BeanDefinition protocolDef = null;

List<BeanDefinition> beanDefList = new ArrayList<BeanDefinition>();
String[] beanNameArray = beanFactory.getBeanDefinitionNames();
for (int i = 0; i < beanNameArray.length; i++) {
String beanName = beanNameArray[i];
BeanDefinition beanDef = beanFactory.getBeanDefinition(beanName);
String beanClassName = beanDef.getBeanClassName();

Class<?> beanClass = null;
try {
beanClass = cl.loadClass(beanClassName);
} catch (Exception ex) {
continue;
}

if (TransactionEndpointAware.class.isAssignableFrom(beanClass)) {
beanDefList.add(beanDef);
} else if (ProtocolConfig.class.isAssignableFrom(beanClass)) {
if (protocolDef == null) {
protocolDef = beanDef;
} else {
throw new FatalBeanException("There are more than one com.alibaba.dubbo.config.ProtocolConfig was found!");
}
}
}

if (protocolDef == null) {
throw new FatalBeanException("No configuration of class com.alibaba.dubbo.config.ProtocolConfig was found.");
}

MutablePropertyValues protocolValues = protocolDef.getPropertyValues();
PropertyValue protocolValue = protocolValues.getPropertyValue("port");
if (protocolValue == null || protocolValue.getValue() == null) {
throw new FatalBeanException("Attribute 'port' of <dubbo:protocol ... /> is null.");
}

String host = CommonUtils.getInetAddress();
String port = String.valueOf(protocolValue.getValue());
String identifier = String.format("%s:%s", host, port);

for (int i = 0; i < beanDefList.size(); i++) {
BeanDefinition beanDef = beanDefList.get(i);
MutablePropertyValues mpv = beanDef.getPropertyValues();
mpv.addPropertyValue(TransactionEndpointAware.ENDPOINT_FIELD_NAME, identifier);
}

}

public void initializeCoordinator(ConfigurableListableBeanFactory beanFactory, BeanDefinition protocolDef,
String compensableBeanId) throws BeansException {
MutablePropertyValues mpv = protocolDef.getPropertyValues();
PropertyValue pv = mpv.getPropertyValue("port");
if (pv == null || pv.getValue() == null) {
throw new FatalBeanException("Attribute 'port' of <dubbo:protocol ... /> is null.");
}

String host = CommonUtils.getInetAddress();
String port = String.valueOf(pv.getValue());
String identifier = String.format("%s:%s", host, port);

BeanDefinition beanDef = beanFactory.getBeanDefinition(compensableBeanId);
beanDef.getPropertyValues().addPropertyValue("identifier", identifier);
}

public TransactionBeanFactory getBeanFactory() {
return beanFactory;
}

public void setBeanFactory(TransactionBeanFactory beanFactory) {
this.beanFactory = beanFactory;
}

}
3 changes: 2 additions & 1 deletion bytejta-supports/src/main/resources/bytejta.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@

<description>The bytejta transaction manager module</description>

<bean class="org.bytesoft.bytejta.supports.spring.ManagedConnectionFactoryPostProcessor" />
<bean class="org.bytesoft.bytejta.supports.spring.TransactionBeanFactoryPostProcessor" />
<bean class="org.bytesoft.bytejta.supports.spring.TransactionEndpointPostProcessor" />
<bean class="org.bytesoft.bytejta.supports.spring.ManagedConnectionFactoryPostProcessor" />

<bean id="bytejtaTransactionManager" class="org.bytesoft.bytejta.TransactionManagerImpl" />
<bean id="bytejtaTransactionCoordinator" class="org.bytesoft.bytejta.TransactionCoordinator" />
Expand Down

0 comments on commit 843fcba

Please sign in to comment.