Skip to content

Commit

Permalink
make the http fetch more foolproof for parallel execution.
Browse files Browse the repository at this point in the history
  • Loading branch information
j-fbriere committed Jan 12, 2024
1 parent ab61bb1 commit e15b068
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 187 deletions.
106 changes: 4 additions & 102 deletions lib/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import 'dart:convert';

import 'package:dart_twitter_api/src/utils/date_utils.dart';
import 'package:dart_twitter_api/twitter_api.dart';
import 'package:faker/faker.dart';
import 'package:ffcache/ffcache.dart';
import 'package:squawker/generated/l10n.dart';
import 'package:squawker/profile/profile_model.dart';
Expand All @@ -14,23 +13,14 @@ import 'package:squawker/client_android.dart';
import 'package:http/http.dart' as http;
import 'package:logging/logging.dart';
import 'package:quiver/iterables.dart';
import 'package:synchronized/synchronized.dart';

const Duration _defaultTimeout = Duration(seconds: 30);
const String _accessToken = 'AAAAAAAAAAAAAAAAAAAAAGHtAgAAAAAA%2Bx7ILXNILCqkSGIzy6faIHZ9s3Q%3DQy97w6SIrzE7lQwPJEYQBsArEE2fC25caFwRBvAGi456G09vGR';

class _SquawkerTwitterClient extends TwitterClient {
static final log = Logger('_SquawkerTwitterClient');

_SquawkerTwitterClient() : super(consumerKey: '', consumerSecret: '', token: '', secret: '');

static final Lock _lock = Lock();
static Completer? _guestTokenCompleter;
static String? _guestToken;
static int _expiresAt = -1;
static int _tokenLimit = -1;
static int _tokenRemaining = -1;

@override
Future<http.Response> get(Uri uri, {Map<String, String>? headers, Duration? timeout}) async {
return getWithRateFetchCtx(uri, headers: headers, timeout: timeout);
Expand All @@ -55,89 +45,6 @@ class _SquawkerTwitterClient extends TwitterClient {
}
}

static Future<String> getToken() async {
var guestToken = await _lock.synchronized(() async {
if (_guestToken != null) {
_guestTokenCompleter = null;
// If we don't have an expiry or limit, it's probably because we haven't made a request yet, so assume they're OK
if (_expiresAt == -1 && _tokenLimit == -1 && _tokenRemaining == -1) {
return _guestToken!;
}

// Check if the token we have hasn't expired yet
if (DateTime.now().millisecondsSinceEpoch < _expiresAt) {
// Check if the token we have still has usages remaining
if (_tokenRemaining < _tokenLimit) {
return _guestToken!;
}
}
}

if (_guestTokenCompleter != null) {
return _guestTokenCompleter!.future;
}
_guestTokenCompleter = Completer();

// Otherwise, fetch a new token
_guestToken = null;
_tokenLimit = -1;
_tokenRemaining = -1;
_expiresAt = -1;

return null;
});
if (guestToken != null) {
return guestToken;
}

log.info('Refreshing the Twitter token');

var response = await http.post(Uri.parse('https://api.twitter.com/1.1/guest/activate.json'), headers: {
'Authorization': 'Bearer $_accessToken',
});

if (response.statusCode == 200) {
var result = jsonDecode(response.body);
if (result.containsKey('guest_token')) {
_guestToken = result['guest_token'];

_guestTokenCompleter!.complete(_guestToken!);
return _guestToken!;
}
}

var exc = Exception('Unable to refresh the token. The response (${response.statusCode}) from Twitter was: ${response.body}');
_guestTokenCompleter!.completeError(exc);
throw exc;
}

static Future<http.Response> fetch(Uri uri, {Map<String, String>? headers}) async {
log.info('Fetching $uri');

var response = await http.get(uri, headers: {
...?headers,
'authorization': 'Bearer $_accessToken',
'x-guest-token': await getToken(),
'x-twitter-active-user': 'yes',
'user-agent': faker.internet.userAgent()
});

var headerRateLimitReset = response.headers['x-rate-limit-reset'];
var headerRateLimitRemaining = response.headers['x-rate-limit-remaining'];
var headerRateLimitLimit = response.headers['x-rate-limit-limit'];

if (headerRateLimitReset == null || headerRateLimitRemaining == null || headerRateLimitLimit == null) {
// If the rate limit headers are missing, the endpoint probably doesn't send them back
return response;
}

// Update our token's rate limit counters
_expiresAt = int.parse(headerRateLimitReset) * 1000;
_tokenRemaining = int.parse(headerRateLimitRemaining);
_tokenLimit = int.parse(headerRateLimitLimit);

return response;
}
}

class UnknownProfileResultType implements Exception {
Expand Down Expand Up @@ -585,19 +492,14 @@ class Twitter {
return tweet;
}

static Future<List<UserWithExtra>> searchUsers(String query, {int limit = 25, String? maxId, String? cursor}) async {
static Future<List<UserWithExtra>> searchUsers(String query, {int limit = 25, int? page}) async {
var queryParameters = {
...defaultParams,
'count': limit.toString(),
'max_id': maxId,
'q': query,
'pc': '1',
'spelling_corrections': '1',
'result_filter': 'user'
'q': query
};

if (cursor != null) {
queryParameters['cursor'] = cursor;
if (page != null) {
queryParameters['page'] = page.toString();
}

var response = await _twitterApi.client.get(Uri.https('api.twitter.com', '/1.1/users/search.json', queryParameters));
Expand Down
166 changes: 81 additions & 85 deletions lib/client_android.dart
Original file line number Diff line number Diff line change
Expand Up @@ -228,60 +228,54 @@ class TwitterAndroid {
}
Map? currentGuestAccountTokens = _guestAccountTokens;
_guestAccountTokens = null;
Map guestAccountTokens = await _lock.synchronized(() async {
if (_guestAccountTokens != null) {
return _guestAccountTokens!;
}
try {
var repository = await Repository.writable();
try {
var repository = await Repository.writable();

int guestAccountIndex = _guestAccountIndex;
if (forceNewAccount) {
guestAccountIndex++;
}
var guestAccountDbData = await repository.query(tableGuestAccount, orderBy: 'created_at ASC');
if (guestAccountDbData.isNotEmpty) {
if (guestAccountDbData.length > guestAccountIndex) {
var guestAccountDb = guestAccountDbData[guestAccountIndex];
_guestAccountTokens = {
'oauthConsumerKey': _oauthConsumerKey,
'oauthConsumerSecret': _oauthConsumerSecret,
'oauthToken': guestAccountDb['oauth_token'],
'oauthTokenSecret': guestAccountDb['oauth_token_secret']
};
_guestAccountIndex = guestAccountIndex;
bool mustInsert = !_rateLimits.containsKey(guestAccountDb['oauth_token']);
_setRateLimits();
if (mustInsert) {
await _saveRateLimits(insert: true);
}
return _guestAccountTokens!;
int guestAccountIndex = _guestAccountIndex;
if (forceNewAccount) {
guestAccountIndex++;
}
var guestAccountDbData = await repository.query(tableGuestAccount, orderBy: 'created_at ASC');
if (guestAccountDbData.isNotEmpty) {
if (guestAccountDbData.length > guestAccountIndex) {
var guestAccountDb = guestAccountDbData[guestAccountIndex];
_guestAccountTokens = {
'oauthConsumerKey': _oauthConsumerKey,
'oauthConsumerSecret': _oauthConsumerSecret,
'oauthToken': guestAccountDb['oauth_token'],
'oauthTokenSecret': guestAccountDb['oauth_token_secret']
};
_guestAccountIndex = guestAccountIndex;
bool mustInsert = !_rateLimits.containsKey(guestAccountDb['oauth_token']);
_setRateLimits();
if (mustInsert) {
await _saveRateLimits(insert: true);
}
return _guestAccountTokens!;
}

String accessToken = await _getAccessToken();
String guestToken = await _getGuestToken(accessToken);
String flowToken = await _getFlowToken(accessToken, guestToken);
var guestAccount = await _getGuestAccountFromTwitter(accessToken, guestToken, flowToken);
_guestAccountTokens = {
'oauthConsumerKey': _oauthConsumerKey,
'oauthConsumerSecret': _oauthConsumerSecret,
'oauthToken': guestAccount['oauth_token'],
'oauthTokenSecret': guestAccount['oauth_token_secret']
};

await repository.insert(tableGuestAccount, guestAccount);
_guestAccountIndex = guestAccountIndex;
_setRateLimits();
await _saveRateLimits(insert: true);
return _guestAccountTokens!;
}
catch (err) {
_guestAccountTokens = currentGuestAccountTokens;
rethrow;
}
});
return guestAccountTokens;

String accessToken = await _getAccessToken();
String guestToken = await _getGuestToken(accessToken);
String flowToken = await _getFlowToken(accessToken, guestToken);
var guestAccount = await _getGuestAccountFromTwitter(accessToken, guestToken, flowToken);
_guestAccountTokens = {
'oauthConsumerKey': _oauthConsumerKey,
'oauthConsumerSecret': _oauthConsumerSecret,
'oauthToken': guestAccount['oauth_token'],
'oauthTokenSecret': guestAccount['oauth_token_secret']
};

await repository.insert(tableGuestAccount, guestAccount);
_guestAccountIndex = guestAccountIndex;
_setRateLimits();
await _saveRateLimits(insert: true);
return _guestAccountTokens!;
}
catch (err) {
_guestAccountTokens = currentGuestAccountTokens;
rethrow;
}
}

static String hmacSHA1(String key, String text) {
Expand Down Expand Up @@ -378,48 +372,50 @@ class TwitterAndroid {
}

static Future<http.Response> fetch(Uri uri, {Map<String, String>? headers, RateFetchContext? fetchContext}) async {
http.Response? rsp;
bool endLoop = false;
RateLimitException? lastExc;
bool longDelayExc = false;
try {
rsp = await _doFetch(uri, headers: headers, fetchContext: fetchContext);
}
on RateLimitException catch (_, ex) {
lastExc = _;
longDelayExc = _.longDelay;
}
while (!endLoop && (longDelayExc || (rsp != null && rsp.statusCode >= 400 && rsp.body.contains('Rate limit exceeded')))) {
return await _lock.synchronized(() async {
http.Response? rsp;
bool endLoop = false;
RateLimitException? lastExc;
bool longDelayExc = false;
try {
// retry the request, but first get or create a new guest account.
lastExc = null;
longDelayExc = false;
rsp = await _doFetch(uri, headers: headers, fetchContext: fetchContext, forceNewAccount: true);
if (rsp.statusCode >= 400 && rsp.body.contains('Rate limit exceeded')) {
// Twitter/X API documentation specify a 24 hours waiting time, but I experimented a 12 hours embargo.
_rateLimitRemaining[uri.path] = 0;
_rateLimitReset[uri.path] = DateTime.now().add(const Duration(hours: 12)).millisecondsSinceEpoch;
await _saveRateLimits();
}
rsp = await _doFetch(uri, headers: headers, fetchContext: fetchContext);
}
on RateLimitException catch (_, ex) {
lastExc = _;
longDelayExc = _.longDelay;
}
on GuestAccountException catch (_, ex) {
endLoop = true;
while (!endLoop && (longDelayExc || (rsp != null && rsp.statusCode >= 400 && rsp.body.contains('Rate limit exceeded')))) {
try {
// retry the request, but first get or create a new guest account.
lastExc = null;
longDelayExc = false;
rsp = await _doFetch(uri, headers: headers, fetchContext: fetchContext, forceNewAccount: true);
if (rsp.statusCode >= 400 && rsp.body.contains('Rate limit exceeded')) {
// Twitter/X API documentation specify a 24 hours waiting time, but I experimented a 12 hours embargo.
_rateLimitRemaining[uri.path] = 0;
_rateLimitReset[uri.path] = DateTime.now().add(const Duration(hours: 12)).millisecondsSinceEpoch;
await _saveRateLimits();
}
}
on RateLimitException catch (_, ex) {
lastExc = _;
longDelayExc = _.longDelay;
}
on GuestAccountException catch (_, ex) {
endLoop = true;
}
}
}
if (lastExc != null) {
log.warning('*** ${lastExc.message}');
throw lastExc;
}
if (rsp != null && rsp.statusCode >= 400 && rsp.body.contains('Rate limit exceeded')) {
// Twitter/X API documentation specify a 24 hours waiting time, but I experimented a 12 hours embargo.
log.warning('*** Twitter/X server Error: The request ${uri.path} has exceeded its rate limit. Will have to wait 12 hours!');
throw RateLimitException('The request ${uri.path} has reached its limit. Please wait 12 hours.', longDelay: true);
}
return rsp!;
if (lastExc != null) {
log.warning('*** ${lastExc.message}');
throw lastExc;
}
if (rsp != null && rsp.statusCode >= 400 && rsp.body.contains('Rate limit exceeded')) {
// Twitter/X API documentation specify a 24 hours waiting time, but I experimented a 12 hours embargo.
log.warning('*** Twitter/X server Error: The request ${uri.path} has exceeded its rate limit. Will have to wait 12 hours!');
throw RateLimitException('The request ${uri.path} has reached its limit. Please wait 12 hours.', longDelay: true);
}
return rsp!;
});
}

}
Expand Down

0 comments on commit e15b068

Please sign in to comment.