Skip to content

Commit

Permalink
remove dateutils logic for oversea version
Browse files Browse the repository at this point in the history
  • Loading branch information
guozhenhong committed Dec 25, 2018
1 parent 686db82 commit ba9d257
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public class ProtobufRecordStreamReader implements RecordReader {
private long bytesReaded = 0;
private Checksum crc = new Checksum();
private Checksum crccrc = new Checksum();
protected Calendar calendar = null;

protected boolean shouldTransform = false;

public ProtobufRecordStreamReader() {

Expand Down Expand Up @@ -115,8 +116,9 @@ public ProtobufRecordStreamReader(TableSchema schema, List<Column> columns, Inpu
this.in.setSizeLimit(Integer.MAX_VALUE);
}

public void setCalendar(Calendar calendar) {
this.calendar = calendar;

public void setTransform(boolean shouldTransform) {
this.shouldTransform = shouldTransform;
}

/**
Expand Down Expand Up @@ -246,7 +248,7 @@ private Object readField(TypeInfo type) throws IOException {
case DATETIME:{
long v = in.readSInt64();
crc.update(v);
return DateUtils.ms2date(v, calendar);
return shouldTransform ? DateUtils.ms2date(v, DateUtils.LOCAL_CAL) : new java.util.Date(v);
}
case DATE: {
long v = in.readSInt64();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,6 @@

package com.aliyun.odps.commons.proto;

import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;

import org.apache.commons.io.output.CountingOutputStream;
import org.xerial.snappy.SnappyFramedOutputStream;

import com.aliyun.odps.Column;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.commons.util.DateUtils;
Expand All @@ -56,6 +40,20 @@
import com.aliyun.odps.type.TypeInfo;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.WireFormat;
import org.apache.commons.io.output.CountingOutputStream;
import org.xerial.snappy.SnappyFramedOutputStream;

import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;

/**
* @author chao.liu
Expand All @@ -70,7 +68,8 @@ public class ProtobufRecordStreamWriter implements RecordWriter {
private Checksum crc = new Checksum();
private Checksum crccrc = new Checksum();
private Deflater def;
private Calendar calendar = null;

private boolean shouldTransform = false;

public ProtobufRecordStreamWriter(TableSchema schema, OutputStream out) throws IOException {
this(schema, out, new CompressOption());
Expand Down Expand Up @@ -106,8 +105,9 @@ static void writeRawBytes(byte[] value, CodedOutputStream out)
out.writeRawBytes(value);
}

public void setCalendar(Calendar calendar) {
this.calendar = calendar;

public void setTransform(boolean shouldTransform) {
this.shouldTransform = shouldTransform;
}

@Override
Expand Down Expand Up @@ -194,7 +194,14 @@ private void writeField(Object v, TypeInfo typeInfo) throws IOException {
}
case DATETIME: {
Date value = (Date) v;
Long longValue = DateUtils.date2ms(value, calendar);


Long longValue = null;
if (!shouldTransform) {
longValue = ((Date) v).getTime();
} else {
longValue = DateUtils.date2ms(value, DateUtils.LOCAL_CAL);
}
crc.update(longValue);
out.writeSInt64NoTag(longValue);
break;
Expand Down Expand Up @@ -390,6 +397,7 @@ public long getTotalBytes() {
public void write(RecordPack pack) throws IOException {
if (pack instanceof ProtobufRecordPack) {
ProtobufRecordPack pbPack = (ProtobufRecordPack) pack;
pbPack.checkTransConsistency(shouldTransform);
pbPack.getProtobufStream().writeTo(bou);
count += pbPack.getSize();
setCheckSum(pbPack.getCheckSum());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.DateUtils;
import com.aliyun.odps.commons.util.IOUtils;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.rest.ResourceBuilder;
Expand Down Expand Up @@ -167,7 +166,7 @@ public class DownloadSession {
private TableSchema schema = new TableSchema();
private DownloadStatus status = DownloadStatus.UNKNOWN;
private Configuration conf;
private boolean useLocalTZ = false;
private boolean shouldTransform = false;

private RestClient tunnelServiceClient;

Expand Down Expand Up @@ -292,7 +291,7 @@ public TunnelRecordReader openRecordReader(long start, long count, CompressOptio

TunnelRecordReader reader =
new TunnelRecordReader(start, count, columns, compress, tunnelServiceClient, this);
reader.setCalendar(useLocalTZ ? DateUtils.LOCAL_CAL : DateUtils.SHANGHAI_CAL);
reader.setTransform(shouldTransform);

return reader;
}
Expand All @@ -315,7 +314,7 @@ private void initiate() throws TunnelException {

if (resp.isOK()) {
loadFromJson(conn.getInputStream());
useLocalTZ =
shouldTransform =
StringUtils.equals(resp.getHeader(HttpHeaders.HEADER_ODPS_DATE_TRANSFORM), "true");
} else {
TunnelException e = new TunnelException(conn.getInputStream());
Expand Down Expand Up @@ -354,7 +353,7 @@ private void reload() throws TunnelException {

if (resp.isOK()) {
loadFromJson(conn.getInputStream());
useLocalTZ =
shouldTransform =
StringUtils.equals(resp.getHeader(HttpHeaders.HEADER_ODPS_DATE_TRANSFORM), "true");
} else {
TunnelException e = new TunnelException(conn.getInputStream());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
package com.aliyun.odps.tunnel;

import static com.aliyun.odps.tunnel.HttpHeaders.HEADER_ODPS_REQUEST_ID;
import static com.aliyun.odps.tunnel.TunnelConstants.TUNNEL_DATE_TRANSFORM_VERSION;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;

Expand All @@ -41,7 +41,6 @@
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Headers;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.DateUtils;
import com.aliyun.odps.commons.util.IOUtils;
import com.aliyun.odps.commons.util.RetryExceedLimitException;
import com.aliyun.odps.commons.util.RetryStrategy;
Expand Down Expand Up @@ -636,9 +635,9 @@ public class UploadSession {
private final Long totalBLocks = 20000L;
private Long shares = 1L;
private Long curBlockId = 0L;

private static final int RETRY_SLEEP_SECONDS = 5;
private boolean useLocalTZ = false;
private boolean shouldTransform = false;

/**
* 根据已有的uploadId构造一个{@link UploadSession}对象
Expand Down Expand Up @@ -691,7 +690,7 @@ private void initiate() throws TunnelException {

if (resp.isOK()) {
loadFromJson(conn.getInputStream());
useLocalTZ =
shouldTransform =
StringUtils.equals(resp.getHeader(HttpHeaders.HEADER_ODPS_DATE_TRANSFORM), "true");

} else {
Expand All @@ -715,6 +714,12 @@ private void initiate() throws TunnelException {
}
}


public boolean isShouldTransform()
{
return shouldTransform;
}

/**
* 多个线程中的 {@link TunnelBufferedWriter} 将通过这个接口获得写入的 blockId
*
Expand Down Expand Up @@ -780,6 +785,7 @@ private void sendBlock(ProtobufRecordPack pack, Connection conn) throws IOExcept
if (null == conn) {
throw new IOException("Invalid connection");
}
pack.checkTransConsistency(shouldTransform);
pack.complete();
ByteArrayOutputStream baos = pack.getProtobufStream();
baos.writeTo(conn.getOutputStream());
Expand Down Expand Up @@ -841,7 +847,7 @@ public RecordWriter openRecordWriter(long blockId, CompressOption compress)
conn = getConnection(blockId, compress);
writer =
new TunnelRecordWriter(schema, conn, compress);
writer.setCalendar(getDateCalendar());
writer.setTransform(shouldTransform);
} catch (IOException e) {
if (conn != null) {
conn.disconnect();
Expand Down Expand Up @@ -943,7 +949,7 @@ private void reload() throws TunnelException {

if (resp.isOK()) {
loadFromJson(conn.getInputStream());
useLocalTZ =
shouldTransform =
StringUtils.equals(resp.getHeader(HttpHeaders.HEADER_ODPS_DATE_TRANSFORM), "true");

} else {
Expand Down Expand Up @@ -1092,14 +1098,27 @@ public RecordPack newRecordPack() throws IOException {
return newRecordPack(null);
}

/**
* 新建一个 ProtobufRecordPack,数据压缩方式 option
*
* @param option
* @throws IOException
*/
public RecordPack newRecordPack(CompressOption option) throws IOException {
ProtobufRecordPack pack = new ProtobufRecordPack(schema, new Checksum(), option);
pack.setCalendar(getDateCalendar());
return pack;
return newRecordPack(0, option);
}

private Calendar getDateCalendar() {
return useLocalTZ ? DateUtils.LOCAL_CAL : DateUtils.SHANGHAI_CAL;
/**
* 新建一个 ProtobufRecordPack,预设流 buffer 大小为 capacity, 数据压缩方式 option
*
* @param capacity
* @param option
* @throws IOException
*/
public RecordPack newRecordPack(int capacity, CompressOption option) throws IOException {
ProtobufRecordPack pack = new ProtobufRecordPack(schema, new Checksum(), capacity, option);
pack.setTransform(shouldTransform);
return pack;
}

/**
Expand Down Expand Up @@ -1190,7 +1209,7 @@ public class DownloadSession {
private Configuration conf;

private RestClient tunnelServiceClient;
private boolean useLocalTZ = false;
private boolean shouldTransform = false;

/**
* 根据已有downloadId构造一个{@link DownloadSession}对象。
Expand Down Expand Up @@ -1313,7 +1332,8 @@ public TunnelRecordReader openRecordReader(long start, long count, CompressOptio
List<Column> columns)
throws TunnelException, IOException {
TunnelRecordReader reader = new TunnelRecordReader(start, count, columns, compress, tunnelServiceClient, this);
reader.setCalendar(useLocalTZ ? DateUtils.LOCAL_CAL : DateUtils.SHANGHAI_CAL);

reader.setTransform(shouldTransform);

return reader;
}
Expand All @@ -1340,7 +1360,7 @@ private void initiate() throws TunnelException {

if (resp.isOK()) {
loadFromJson(conn.getInputStream());
useLocalTZ =
shouldTransform =
StringUtils.equals(resp.getHeader(HttpHeaders.HEADER_ODPS_DATE_TRANSFORM), "true");

} else {
Expand Down Expand Up @@ -1387,7 +1407,7 @@ private void reload() throws TunnelException {

if (resp.isOK()) {
loadFromJson(conn.getInputStream());
useLocalTZ =
shouldTransform =
StringUtils.equals(resp.getHeader(HttpHeaders.HEADER_ODPS_DATE_TRANSFORM), "true");
} else {
TunnelException e = new TunnelException(conn.getInputStream());
Expand Down Expand Up @@ -1501,7 +1521,7 @@ static HashMap<String, String> getCommonHeader() {
HashMap<String, String> headers = new HashMap<String, String>();

headers.put(Headers.CONTENT_LENGTH, String.valueOf(0));
headers.put(HttpHeaders.HEADER_ODPS_DATE_TRANSFORM, "true");
headers.put(HttpHeaders.HEADER_ODPS_DATE_TRANSFORM, TUNNEL_DATE_TRANSFORM_VERSION);
return headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ public interface TunnelConstants {
public static String MODE = "mode";
public static String STREAM_UPLOAD = "streamupload";
public static String INSTANCE_TUNNEL_LIMIT_ENABLED = "instance_tunnel_limit_enabled";
public static String TUNNEL_DATE_TRANSFORM_VERSION = "v1";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Calendar;

import com.aliyun.odps.TableSchema;
import com.aliyun.odps.commons.proto.ProtobufRecordStreamWriter;
Expand All @@ -42,8 +41,14 @@ public class ProtobufRecordPack extends RecordPack {
private TableSchema schema;
private CompressOption option = null;
private boolean isComplete = false;
private Calendar calendar = null;

private boolean shouldTransform = false;

public void checkTransConsistency(boolean expect) throws IOException {
if (shouldTransform != expect) {
throw new IOException("RecordPack breaks the restriction of session. Try session.newRecordPack()");
}
}
/**
* 新建一个ProtobufRecordPack
*
Expand Down Expand Up @@ -112,17 +117,20 @@ public ProtobufRecordPack(TableSchema schema, Checksum checksum, int capacity, C
this.schema = schema;
if (null != option) {
this.option = option;
} else {
this.option = new CompressOption(CompressOption.CompressAlgorithm.ODPS_RAW, 0, 0);
}

writer = new ProtobufRecordStreamWriter(schema, byteos, option);
writer = new ProtobufRecordStreamWriter(schema, byteos, this.option);
if (null != checksum) {
writer.setCheckSum(checksum);
}
}

public void setCalendar(Calendar calendar) {
this.calendar = calendar;
this.writer.setCalendar(calendar);

public void setTransform(boolean shouldTransform) {
this.shouldTransform = shouldTransform;
this.writer.setTransform(shouldTransform);
}

@Override
Expand Down Expand Up @@ -201,7 +209,7 @@ public void reset() throws IOException {
}
count = 0;
this.writer = new ProtobufRecordStreamWriter(schema, byteos, option);
this.writer.setCalendar(calendar);
this.writer.setTransform(shouldTransform);
isComplete = false;
}

Expand Down
Loading

0 comments on commit ba9d257

Please sign in to comment.