-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy paths3file.py
139 lines (106 loc) · 3.95 KB
/
s3file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from urlparse import urlparse
import cStringIO
import mimetypes
import os
import datetime
__version__ = '1.3'
def s3open(*args, **kwargs):
""" Convenience method for creating S3File object.
"""
return S3File(*args, **kwargs)
class S3File(object):
def __init__(self, url, key=None, secret=None, expiration_days=0, private=False, content_type=None, create=True):
from boto.s3.connection import S3Connection
from boto.s3.key import Key
self.url = urlparse(url)
self.expiration_days = expiration_days
self.buffer = cStringIO.StringIO()
self.private = private
self.closed = False
self._readreq = True
self._writereq = False
self.content_type = content_type or mimetypes.guess_type(self.url.path)[0]
bucket = self.url.netloc
if bucket.endswith('.s3.amazonaws.com'):
bucket = bucket[:-17]
self.client = S3Connection(key, secret)
self.name = "s3://" + bucket + self.url.path
if create:
self.bucket = self.client.create_bucket(bucket)
else:
self.bucket = self.client.get_bucket(bucket, validate=False)
self.key = Key(self.bucket)
self.key.key = self.url.path.lstrip("/")
self.buffer.truncate(0)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def _remote_read(self):
""" Read S3 contents into internal file buffer.
Once only
"""
if self._readreq:
self.buffer.truncate(0)
if self.key.exists():
self.key.get_contents_to_file(self.buffer)
self.buffer.seek(0)
self._readreq = False
def _remote_write(self):
""" Write file contents to S3 from internal buffer.
"""
if self._writereq:
self.truncate(self.tell())
headers = {
"x-amz-acl": "private" if self.private else "public-read"
}
if self.content_type:
headers["Content-Type"] = self.content_type
if self.expiration_days:
now = datetime.datetime.utcnow()
then = now + datetime.timedelta(self.expiration_days)
headers["Expires"] = then.strftime("%a, %d %b %Y %H:%M:%S GMT")
headers["Cache-Control"] = 'max-age=%d' % (self.expiration_days * 24 * 3600,)
self.key.set_contents_from_file(self.buffer, headers=headers, rewind=True)
def close(self):
""" Close the file and write contents to S3.
"""
self._remote_write()
self.buffer.close()
self.closed = True
# pass-through methods
def flush(self):
self._remote_write()
def next(self):
self._remote_read()
return self.buffer.next()
def read(self, size=-1):
self._remote_read()
return self.buffer.read(size)
def readline(self, size=-1):
self._remote_read()
return self.buffer.readline(size)
def readlines(self, sizehint=-1):
self._remote_read()
return self.buffer.readlines(sizehint)
def xreadlines(self):
self._remote_read()
return self.buffer
def seek(self, offset, whence=os.SEEK_SET):
self.buffer.seek(offset, whence)
# if it looks like we are moving in the file and we have not written
# anything then we probably should read the contents
if self.tell() != 0 and self._readreq and not self._writereq:
self._remote_read()
self.buffer.seek(offset, whence)
def tell(self):
return self.buffer.tell()
def truncate(self, size=None):
self._writereq = True
self.buffer.truncate(size or self.tell())
def write(self, s):
self._writereq = True
self.buffer.write(s)
def writelines(self, sequence):
self._writereq = True
self.buffer.writelines(sequence)