Skip to content

Commit

Permalink
Revert "modified read log file"
Browse files Browse the repository at this point in the history
This reverts commit 57decf3.
  • Loading branch information
wankunde committed Apr 24, 2015
1 parent 2401f8a commit 2d17ac4
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 174 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ g. 注意事项
jar xvf recsys-1.0.jar


### 系统运行
#### 系统运行

客户端

Expand Down
141 changes: 92 additions & 49 deletions src/main/java/com/wankun/logcount/kafka/TailLog.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package com.wankun.logcount.kafka;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.CharBuffer;
import java.util.concurrent.BlockingQueue;

import org.slf4j.Logger;
Expand All @@ -16,7 +15,10 @@ public class TailLog extends Thread {

private BlockingQueue<String> queue;
private String logname;
ByteBuffer buf = ByteBuffer.allocate(4096);

private CharBuffer buf = CharBuffer.allocate(4096);

// private ByteBuffer buf = ByteBuffer.allocate(4096);

public TailLog(BlockingQueue<String> queue, String logname) {
this.queue = queue;
Expand All @@ -25,65 +27,106 @@ public TailLog(BlockingQueue<String> queue, String logname) {

@Override
public void run() {
RandomAccessFile rfile = null;
FileChannel ch = null;
BufferedReader reader = null;
try {
rfile = new RandomAccessFile(new File(logname), "r");
ch = rfile.getChannel();
// Path logpath=Paths.get(logname);
// File posfile =
// logpath.getParent().resolve("."+logpath.getFileName()+".pos").toFile();
reader = new BufferedReader(new FileReader(new File(logname)));

int pos = 0;
int limit = 0;
long filesize = 0;
while (true) {
if (ch.read(buf) > 0) {
limit = buf.limit();
pos = 0;
buf.flip();
while (buf.hasRemaining()) {
byte b = buf.get();
if (b == '\n') {
int len = buf.position();
buf.position(pos);
buf.limit(len);
String line = Charset.forName("GBK").decode(buf.slice()).toString();
logger.debug("new line --> "+line);
queue.put(line);
pos = len;
buf.position(len);
buf.limit(limit);
// 判断文件是否已经切换
if (filesize > new File(logname).length()) {
logger.debug("filesize :{} current system file size :{} . Log file switchover!", filesize,
new File(logname).length());
try {
// 在切换读文件前,读取文件全部内容
StringBuilder line = new StringBuilder();
while (reader.read(buf) > 0) {
buf.flip();
synchronized (buf) {
// 读buffer 并解析
for (int i = 0; i < buf.limit(); i++) {
char c = buf.get();
line.append(c);
if ((c == '\n') || (c == '\r'))
if (line.length() > 0) {
queue.put(line.toString());
line = new StringBuilder();
}
}
}
}
}
buf.position(pos);
buf.compact();
} else {
Thread.currentThread().sleep(1000);
// 文件已经切换
if (ch.position() > rfile.length()) {
String line = Charset.forName("GBK").decode(buf.slice()).toString();
queue.put(line);
queue.put(line.toString());
buf.clear();

ch.close();
rfile.close();
rfile = new RandomAccessFile(new File(logname), "r");
ch = rfile.getChannel();
// 切换读文件
if (reader != null)
reader.close();
reader = new BufferedReader(new FileReader(new File(logname)));
} catch (Exception e) {
logger.error("文件 {} 不存在", logname, e);
Thread.currentThread().sleep(10000);
continue;
}
}

for (int retrys = 10; retrys > 0; retrys--) {
int bufread = reader.read(buf);
if (bufread < 0) {
if (retrys > 0)
Thread.currentThread().sleep(1000);
else {
// 等待10s后无新数据读出
synchronized (buf) {
// 等待 cachetime 秒后文件仍未写入
buf.flip();
char[] dst = new char[buf.length()];
buf.get(dst);
buf.clear();
queue.put(new String(dst));
}
}
} else {
filesize = new File(logname).length();
retrys = -1;

buf.flip();
synchronized (buf) {
// 读buffer 并解析
StringBuilder line = new StringBuilder();
for (int i = 0; i < buf.limit(); i++) {
char c = buf.get();
line.append(c);
if ((c == '\n') || (c == '\r'))
if (line.length() > 0) {
queue.put(line.toString());
line = new StringBuilder();
}
}
// 接着写不完整的数据
buf.compact();
if (line.length() > 0) {
buf.append(line);
}
}
break;
}
}
}
} catch (Exception e) {
logger.error("文件读取失败", e);
} finally {
if (ch != null)
try {
ch.close();
} catch (IOException e) {
logger.error("文件 FileChannel ch 关闭失败", e);
}
if (rfile != null)
if (reader != null) {
try {
rfile.close();
reader.close();
} catch (IOException e) {
logger.error("文件 RandomAccessFile rfile 关闭失败", e);
logger.error("文件 reader 关闭失败", e);
}
}
}

}

}
122 changes: 0 additions & 122 deletions src/main/java/com/wankun/logcount/kafka/TailLogBak.java

This file was deleted.

4 changes: 2 additions & 2 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
<!-- 文件输出日志 (文件大小策略进行文件输出,超过指定大小对文件备份) -->
<appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>logs/logcount.log</File>
<File>logs/recsys.log</File>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<FileNamePattern>logs/logcount.log.%i.bak</FileNamePattern>
<FileNamePattern>logs/recsys.log.%i.bak</FileNamePattern>
<MinIndex>1</MinIndex>
<MaxIndex>12</MaxIndex>
</rollingPolicy>
Expand Down

0 comments on commit 2d17ac4

Please sign in to comment.