Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Implement goburrow#25
Browse files Browse the repository at this point in the history
  • Loading branch information
andig committed Oct 10, 2018
1 parent 0d0a427 commit 7c6b730
Showing 1 changed file with 75 additions and 40 deletions.
115 changes: 75 additions & 40 deletions tcpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,26 +84,7 @@ func (mb *tcpPackager) Encode(pdu *ProtocolDataUnit) (adu []byte, err error) {

// Verify confirms transaction, protocol and unit id.
func (mb *tcpPackager) Verify(aduRequest []byte, aduResponse []byte) (err error) {
// Transaction id
responseVal := binary.BigEndian.Uint16(aduResponse)
requestVal := binary.BigEndian.Uint16(aduRequest)
if responseVal != requestVal {
err = fmt.Errorf("modbus: response transaction id '%v' does not match request '%v'", responseVal, requestVal)
return
}
// Protocol id
responseVal = binary.BigEndian.Uint16(aduResponse[2:])
requestVal = binary.BigEndian.Uint16(aduRequest[2:])
if responseVal != requestVal {
err = fmt.Errorf("modbus: response protocol id '%v' does not match request '%v'", responseVal, requestVal)
return
}
// Unit id (1 byte)
if aduResponse[6] != aduRequest[6] {
err = fmt.Errorf("modbus: response unit id '%v' does not match request '%v'", aduResponse[6], aduRequest[6])
return
}
return
return verify(aduRequest, aduResponse)
}

// Decode extracts PDU from TCP frame:
Expand Down Expand Up @@ -134,6 +115,10 @@ type tcpTransporter struct {
Timeout time.Duration
// Idle timeout to close the connection
IdleTimeout time.Duration
// Recovery timeout if tcp communication misbehaves
LinkRecoveryTimeout time.Duration
// Recovery timeout if the protocol is malformed, e.g. wrong transaction ID
ProtocolRecoveryTimeout time.Duration
// Transmission logger
Logger *log.Logger

Expand All @@ -149,31 +134,58 @@ func (mb *tcpTransporter) Send(aduRequest []byte) (aduResponse []byte, err error
mb.mu.Lock()
defer mb.mu.Unlock()

var data [tcpMaxLength]byte
recoveryDeadline := time.Now().Add(mb.IdleTimeout)

// Establish a new connection if not connected
if err = mb.connect(); err != nil {
return
}
// Set timer to close when idle
mb.lastActivity = time.Now()
mb.startCloseTimer()
// Set write and read timeout
var timeout time.Time
if mb.Timeout > 0 {
timeout = mb.lastActivity.Add(mb.Timeout)
}
if err = mb.conn.SetDeadline(timeout); err != nil {
return
}
// Send data
mb.logf("modbus: sending % x", aduRequest)
if _, err = mb.conn.Write(aduRequest); err != nil {
return
}
// Read header first
var data [tcpMaxLength]byte
if _, err = io.ReadFull(mb.conn, data[:tcpHeaderSize]); err != nil {
return

for {
// Set timer to close when idle
mb.lastActivity = time.Now()
mb.startCloseTimer()
// Set write and read timeout
var timeout time.Time
if mb.Timeout > 0 {
timeout = mb.lastActivity.Add(mb.Timeout)
}
if err = mb.conn.SetDeadline(timeout); err != nil {
return
}
// Send data
mb.logf("modbus: sending % x", aduRequest)
if _, err = mb.conn.Write(aduRequest); err != nil {
return
}
// Read header first
if _, err = io.ReadFull(mb.conn, data[:tcpHeaderSize]); err == nil {
aduResponse, err = mb.processResponse(data[:])
if err == nil && mb.ProtocolRecoveryTimeout > 0 && recoveryDeadline.Sub(time.Now()) > 0 &&
verify(aduRequest, aduResponse) != nil {
continue
}
mb.logf("modbus: received % x\n", aduResponse)
return
// Read attempt failed
} else if (err != io.EOF && err != io.ErrUnexpectedEOF) ||
mb.LinkRecoveryTimeout == 0 || recoveryDeadline.Sub(time.Now()) < 0 {
return
}
mb.logf("modbus: close connection and retry, because of %v", err)

mb.close()
time.Sleep(mb.LinkRecoveryTimeout)

// Establish a new connection if not connected
if err = mb.connect(); err != nil {
return
}
}
}

func (mb *tcpTransporter) processResponse(data []byte) (aduResponse []byte, err error) {
// Read length, ignore transaction & protocol id (4 bytes)
length := int(binary.BigEndian.Uint16(data[4:]))
if length <= 0 {
Expand All @@ -196,6 +208,29 @@ func (mb *tcpTransporter) Send(aduRequest []byte) (aduResponse []byte, err error
return
}

func verify(aduRequest []byte, aduResponse []byte) (err error) {
// Transaction id
responseVal := binary.BigEndian.Uint16(aduResponse)
requestVal := binary.BigEndian.Uint16(aduRequest)
if responseVal != requestVal {
err = fmt.Errorf("modbus: response transaction id '%v' does not match request '%v'", responseVal, requestVal)
return
}
// Protocol id
responseVal = binary.BigEndian.Uint16(aduResponse[2:])
requestVal = binary.BigEndian.Uint16(aduRequest[2:])
if responseVal != requestVal {
err = fmt.Errorf("modbus: response protocol id '%v' does not match request '%v'", responseVal, requestVal)
return
}
// Unit id (1 byte)
if aduResponse[6] != aduRequest[6] {
err = fmt.Errorf("modbus: response unit id '%v' does not match request '%v'", aduResponse[6], aduRequest[6])
return
}
return
}

// Connect establishes a new connection to the address in Address.
// Connect and Close are exported so that multiple requests can be done with one session
func (mb *tcpTransporter) Connect() error {
Expand Down

0 comments on commit 7c6b730

Please sign in to comment.