diff --git a/janitor/functions/_numba.py b/janitor/functions/_numba.py index bf4c2a3de..e99987d90 100644 --- a/janitor/functions/_numba.py +++ b/janitor/functions/_numba.py @@ -254,7 +254,7 @@ def _numba_equi_join( return left_index, right_index -@njit(parallel=True) +@njit(parallel=True, cache=True) def _numba_equi_le_join( left_index: np.ndarray, right_index: np.ndarray, @@ -324,7 +324,7 @@ def _numba_equi_le_join( return l_index, r_index -@njit(parallel=True) +@njit(parallel=True, cache=True) def _numba_equi_ge_join( left_index: np.ndarray, right_index: np.ndarray, @@ -394,7 +394,7 @@ def _numba_equi_ge_join( return l_index, r_index -@njit(parallel=True) +@njit(parallel=True, cache=True) def _numba_equi_join_range_join( left_index: np.ndarray, right_index: np.ndarray, @@ -603,7 +603,7 @@ def _numba_single_non_equi_join( left=left, right=right, op=op, multiple_conditions=True, keep="all" ) if outcome is None: - return None, None + return None left_index, right_index, starts = outcome if op in greater_than_join_types: right_index = right_index[::-1] @@ -820,6 +820,7 @@ def _numba_multiple_non_equi_join( right_index = right_df.index._values right_is_sorted = True shape = (len(left_df), len(gt_lt)) + # use the l_booleans and r_booleans to track rows that have complete matches left_regions = np.empty(shape=shape, dtype=np.intp, order="F") l_booleans = np.zeros(len(df), dtype=np.intp) shape = (len(right_df), len(gt_lt)) @@ -834,7 +835,7 @@ def _numba_multiple_non_equi_join( keep="all", ) if outcome is None: - return None, None + return None left_indexer, right_indexer, search_indices = outcome if op in greater_than_join_types: search_indices = right_indexer.size - search_indices @@ -853,13 +854,13 @@ def _numba_multiple_non_equi_join( right_df = None booleans = l_booleans == len(gt_lt) if not booleans.any(): - return None, None + return None if not booleans.all(): left_regions = left_regions[booleans] left_index = left_index[booleans] booleans = r_booleans == len(gt_lt) if not booleans.any(): - return None, None + return None if not booleans.all(): right_regions = right_regions[booleans] right_index = right_index[booleans] @@ -876,7 +877,7 @@ def _numba_multiple_non_equi_join( starts = right_regions[:, 0].searchsorted(left_regions[:, 0]) booleans = starts < len(right_regions) if not booleans.any(): - return None, None + return None if not booleans.all(): starts = starts[booleans] left_regions = left_regions[booleans] @@ -889,7 +890,7 @@ def _numba_multiple_non_equi_join( search_indices = right_regions[:, 1].searchsorted(left_regions[:, 1]) booleans = search_indices < len(right_regions) if not booleans.any(): - return None, None + return None if not booleans.all(): starts = starts[booleans] search_indices = search_indices[booleans] @@ -904,7 +905,7 @@ def _numba_multiple_non_equi_join( ends = right_regions[::-1, 1].searchsorted(left_regions[:, 1]) booleans = starts < len(right_regions) if not booleans.any(): - return None, None + return None if not booleans.all(): starts = starts[booleans] left_regions = left_regions[booleans] @@ -912,7 +913,7 @@ def _numba_multiple_non_equi_join( ends = len(right_regions) - ends booleans = starts < ends if not booleans.any(): - return None, None + return None if not booleans.all(): starts = starts[booleans] left_regions = left_regions[booleans] @@ -989,30 +990,40 @@ def _numba_multiple_non_equi_join( right_indices=right_indices, ) - if (check_increasing) & (keep == "first"): - return _numba_non_equi_join_monotonic_increasing_keep_first( - left_regions=left_regions[:, 2:], - right_regions=right_regions[:, 2:], - left_index=left_index, - right_index=right_index, - starts=starts, - ) - if (check_increasing) & (keep == "last"): - return _numba_non_equi_join_monotonic_increasing_keep_last( - left_regions=left_regions[:, 2:], - right_regions=right_regions[:, 2:], - left_index=left_index, - right_index=right_index, - starts=starts, - ) if check_increasing: - return _numba_non_equi_join_monotonic_increasing_keep_all( - left_regions=left_regions[:, 2:], - right_regions=right_regions[:, 2:], - left_index=left_index, - right_index=right_index, - starts=starts, - ) + if keep == "first": + left_indices, right_indices = ( + _numba_non_equi_join_monotonic_increasing_keep_first( + left_regions=left_regions[:, 2:], + right_regions=right_regions[:, 2:], + left_index=left_index, + right_index=right_index, + starts=starts, + ) + ) + elif keep == "last": + left_indices, right_indices = ( + _numba_non_equi_join_monotonic_increasing_keep_last( + left_regions=left_regions[:, 2:], + right_regions=right_regions[:, 2:], + left_index=left_index, + right_index=right_index, + starts=starts, + ) + ) + else: + left_indices, right_indices = ( + _numba_non_equi_join_monotonic_increasing_keep_all( + left_regions=left_regions[:, 2:], + right_regions=right_regions[:, 2:], + left_index=left_index, + right_index=right_index, + starts=starts, + ) + ) + if left_indices is None: + return None + return left_indices, right_indices if (check_decreasing) & (len(gt_lt) == 2) & (keep == "all"): start_indices = np.empty(left_index.size, dtype=np.intp) @@ -1022,7 +1033,7 @@ def _numba_multiple_non_equi_join( indices = indices[-1] left_indices = np.empty(indices, dtype=np.intp) right_indices = np.empty(indices, dtype=np.intp) - return _numba_non_equi_join_monotonic_keep_all_dual( + return _numba_non_equi_join_monotonic_decreasing_keep_all_dual( left_index=left_index, right_index=right_index, starts=starts, @@ -1035,7 +1046,7 @@ def _numba_multiple_non_equi_join( if (check_decreasing) & (len(gt_lt) == 2) & (keep == "first"): left_indices = np.empty(left_index.size, dtype=np.intp) right_indices = np.empty(left_index.size, dtype=np.intp) - return _numba_non_equi_join_monotonic_keep_first_dual( + return _numba_non_equi_join_monotonic_decreasing_keep_first_dual( left_index=left_index, right_index=right_index, starts=starts, @@ -1047,7 +1058,7 @@ def _numba_multiple_non_equi_join( if (check_decreasing) & (len(gt_lt) == 2) & (keep == "last"): left_indices = np.empty(left_index.size, dtype=np.intp) right_indices = np.empty(left_index.size, dtype=np.intp) - return _numba_non_equi_join_monotonic_keep_last_dual( + return _numba_non_equi_join_monotonic_decreasing_keep_last_dual( left_index=left_index, right_index=right_index, starts=starts, @@ -1056,43 +1067,56 @@ def _numba_multiple_non_equi_join( right_indices=right_indices, ) - if (check_decreasing) & (keep == "first"): - return _numba_non_equi_join_monotonic_keep_first( - left_regions=left_regions[:, 2:], - right_regions=right_regions[:, 2:], - left_index=left_index, - right_index=right_index, - starts=starts, - ends=ends, - ) + if check_decreasing: + if keep == "first": + left_indices, right_indices = ( + _numba_non_equi_join_monotonic_decreasing_keep_first( + left_regions=left_regions[:, 2:], + right_regions=right_regions[:, 2:], + left_index=left_index, + right_index=right_index, + starts=starts, + ends=ends, + ) + ) - if (check_decreasing) & (keep == "last"): - return _numba_non_equi_join_monotonic_keep_last( - left_regions=left_regions[:, 2:], - right_regions=right_regions[:, 2:], - left_index=left_index, - right_index=right_index, - starts=starts, - ends=ends, - ) + elif keep == "last": + left_indices, right_indices = ( + _numba_non_equi_join_monotonic_decreasing_keep_last( + left_regions=left_regions[:, 2:], + right_regions=right_regions[:, 2:], + left_index=left_index, + right_index=right_index, + starts=starts, + ends=ends, + ) + ) - if check_decreasing: - return _numba_non_equi_join_monotonic_keep_all( - left_regions=left_regions[:, 2:], - right_regions=right_regions[:, 2:], - left_index=left_index, - right_index=right_index, - starts=starts, - ends=ends, - ) + else: + left_indices, right_indices = ( + _numba_non_equi_join_monotonic_decreasing_keep_all( + left_regions=left_regions[:, 2:], + right_regions=right_regions[:, 2:], + left_index=left_index, + right_index=right_index, + starts=starts, + ends=ends, + ) + ) + if left_indices is None: + return None + return left_indices, right_indices # logic here is based on grantjenks' sortedcontainers # https://github.com/grantjenks/python-sortedcontainers load_factor = 1_000 width = load_factor * 2 length = ceil(right_index.size / load_factor) + # maintain a sorted array of the regions sorted_array = np.empty( (width, length), dtype=right_regions.dtype, order="F" ) + # keep track of the positions of each region + # within the sorted array positions_array = np.empty( (width, length), dtype=right_regions.dtype, order="F" ) @@ -1101,44 +1125,54 @@ def _numba_multiple_non_equi_join( # keep track of the length of actual data for each column lengths = np.empty(length, dtype=np.intp) if keep == "all": - return _numba_non_equi_join_not_monotonic_keep_all( - left_regions=left_regions[:, 1:], - right_regions=right_regions[:, 1:], - left_index=left_index, - right_index=right_index, - maxxes=maxxes, - lengths=lengths, - sorted_array=sorted_array, - positions_array=positions_array, - starts=starts, - load_factor=load_factor, + left_indices, right_indices = ( + _numba_non_equi_join_not_monotonic_keep_all( + left_regions=left_regions[:, 1:], + right_regions=right_regions[:, 1:], + left_index=left_index, + right_index=right_index, + maxxes=maxxes, + lengths=lengths, + sorted_array=sorted_array, + positions_array=positions_array, + starts=starts, + load_factor=load_factor, + ) ) - if keep == "first": - return _numba_non_equi_join_not_monotonic_keep_first( - left_regions=left_regions[:, 1:], - right_regions=right_regions[:, 1:], - left_index=left_index, - right_index=right_index, - maxxes=maxxes, - lengths=lengths, - sorted_array=sorted_array, - positions_array=positions_array, - starts=starts, - load_factor=load_factor, + elif keep == "first": + left_indices, right_indices = ( + _numba_non_equi_join_not_monotonic_keep_first( + left_regions=left_regions[:, 1:], + right_regions=right_regions[:, 1:], + left_index=left_index, + right_index=right_index, + maxxes=maxxes, + lengths=lengths, + sorted_array=sorted_array, + positions_array=positions_array, + starts=starts, + load_factor=load_factor, + ) ) # keep == 'last' - return _numba_non_equi_join_not_monotonic_keep_last( - left_regions=left_regions[:, 1:], - right_regions=right_regions[:, 1:], - left_index=left_index, - right_index=right_index, - maxxes=maxxes, - lengths=lengths, - sorted_array=sorted_array, - positions_array=positions_array, - starts=starts, - load_factor=load_factor, - ) + else: + left_indices, right_indices = ( + _numba_non_equi_join_not_monotonic_keep_last( + left_regions=left_regions[:, 1:], + right_regions=right_regions[:, 1:], + left_index=left_index, + right_index=right_index, + maxxes=maxxes, + lengths=lengths, + sorted_array=sorted_array, + positions_array=positions_array, + starts=starts, + load_factor=load_factor, + ) + ) + if left_indices is None: + return None + return left_indices, right_indices @njit(cache=True) @@ -1162,10 +1196,15 @@ def _numba_non_equi_join_not_monotonic_keep_all( length = left_index.size end = right_index.size end -= 1 + # add the last region + # no need to have this checked within an if-else statement + # in the for loop below region = right_regions[np.uint64(end), 0] sorted_array[0, 0] = region positions_array[0, 0] = end - maxes_counter = 1 + # keep track of the maxxes array + # how many cells have actual values? + maxxes_counter = 1 maxxes[0] = region lengths[0] = 1 r_count = 0 @@ -1177,10 +1216,12 @@ def _numba_non_equi_join_not_monotonic_keep_all( for num in range(start, end): _num = np.uint64(num) region = right_regions[_num, 0] - arr = maxxes[:maxes_counter] + arr = maxxes[:maxxes_counter] posn = _numba_less_than(arr=arr, value=region) + # it is larger than the max in the maxxes array + # shove it into the last column if posn == -1: - posn = maxes_counter - 1 + posn = maxxes_counter - 1 posn_ = np.uint64(posn) len_arr = lengths[posn_] len_arr_ = np.uint64(len_arr) @@ -1189,57 +1230,44 @@ def _numba_non_equi_join_not_monotonic_keep_all( maxxes[posn_] = region lengths[posn_] += 1 else: - # the sorted array is an adaptation - # of grantjenks' sortedcontainers - posn_ = np.uint64(posn) - len_arr = lengths[posn_] - arr = sorted_array[:len_arr, posn_] - insort_posn = _numba_less_than(arr=arr, value=region) - # shift downwards before inserting - for ind in range(len_arr - 1, insort_posn - 1, -1): - ind_ = np.uint64(ind) - _ind = np.uint64(ind + 1) - sorted_array[_ind, posn_] = sorted_array[ind_, posn_] - positions_array[_ind, posn_] = positions_array[ind_, posn_] - insort = np.uint64(insort_posn) - sorted_array[insort, posn_] = region - positions_array[insort, posn_] = num - lengths[posn_] += 1 - maxxes[posn_] = sorted_array[np.uint64(len_arr), posn_] + sorted_array, positions_array, lengths, maxxes = ( + _numba_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + maxxes=maxxes, + lengths=lengths, + region=region, + posn=posn, + num=num, + ) + ) r_count += 1 posn_ = np.uint64(posn) + # have we exceeded the size of this column? + # do we need to trim and move data to other columns? check = (lengths[posn_] == (load_factor * 2)) & ( r_count < right_index.size ) if check: - # shift from left+1 to right - for pos in range(maxes_counter - 1, posn, -1): - forward = np.uint64(pos + 1) - current = np.uint64(pos) - sorted_array[:, forward] = sorted_array[:, current] - positions_array[:, forward] = positions_array[:, current] - maxxes[forward] = maxxes[current] - lengths[forward] = lengths[current] - # share half the load from left to left+1 - forward = np.uint64(posn + 1) - current = np.uint64(posn) - maxxes[forward] = sorted_array[-1, current] - lengths[forward] = load_factor - sorted_array[:load_factor, forward] = sorted_array[ - load_factor:, current - ] - positions_array[:load_factor, forward] = positions_array[ - load_factor:, current - ] - lengths[current] = load_factor - maxxes[current] = sorted_array[ - np.uint64(load_factor - 1), current - ] - maxes_counter += 1 + ( + sorted_array, + positions_array, + lengths, + maxxes, + maxxes_counter, + ) = _expand_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + lengths=lengths, + maxxes=maxxes, + posn=posn, + maxxes_counter=maxxes_counter, + load_factor=load_factor, + ) # now we do a binary search # for left region in right region l_region = left_regions[_indexer, 0] - arr = maxxes[:maxes_counter] + arr = maxxes[:maxxes_counter] posn = _numba_less_than(arr=arr, value=l_region) if posn == -1: end = start @@ -1265,7 +1293,8 @@ def _numba_non_equi_join_not_monotonic_keep_all( if counter == 0: continue total += 1 - for ind in range(posn + 1, maxes_counter): + # check the remaining columns, if any + for ind in range(posn + 1, maxxes_counter): ind_ = np.uint64(ind) len_arr = lengths[ind_] for num in range(len_arr): @@ -1294,7 +1323,7 @@ def _numba_non_equi_join_not_monotonic_keep_all( region = right_regions[np.uint64(end), 0] sorted_array[0, 0] = region positions_array[0, 0] = end - maxes_counter = 1 + maxxes_counter = 1 maxxes[0] = region lengths[0] = 1 r_count = 0 @@ -1309,10 +1338,10 @@ def _numba_non_equi_join_not_monotonic_keep_all( for num in range(start, end): _num = np.uint64(num) region = right_regions[_num, 0] - arr = maxxes[:maxes_counter] + arr = maxxes[:maxxes_counter] posn = _numba_less_than(arr=arr, value=region) if posn == -1: - posn = maxes_counter - 1 + posn = maxxes_counter - 1 posn_ = np.uint64(posn) len_arr = lengths[posn_] len_arr_ = np.uint64(len_arr) @@ -1321,55 +1350,44 @@ def _numba_non_equi_join_not_monotonic_keep_all( maxxes[posn_] = region lengths[posn_] += 1 else: - posn_ = np.uint64(posn) - len_arr = lengths[posn_] - arr = sorted_array[:len_arr, posn_] - insort_posn = _numba_less_than(arr=arr, value=region) - # shift downwards before inserting - for ind in range(len_arr - 1, insort_posn - 1, -1): - ind_ = np.uint64(ind) - _ind = np.uint64(ind + 1) - sorted_array[_ind, posn_] = sorted_array[ind_, posn_] - positions_array[_ind, posn_] = positions_array[ind_, posn_] - insort = np.uint64(insort_posn) - sorted_array[insort, posn_] = region - positions_array[insort, posn_] = num - lengths[posn_] += 1 - maxxes[posn_] = sorted_array[np.uint64(len_arr), posn_] + sorted_array, positions_array, lengths, maxxes = ( + _numba_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + maxxes=maxxes, + lengths=lengths, + region=region, + posn=posn, + num=num, + ) + ) r_count += 1 posn_ = np.uint64(posn) + # have we reached the max size of this column? + # do we need to trim and move data to other columns? check = (lengths[posn_] == (load_factor * 2)) & ( r_count < right_index.size ) if check: - # shift from left+1 to right - for pos in range(maxes_counter - 1, posn, -1): - forward = np.uint64(pos + 1) - current = np.uint64(pos) - sorted_array[:, forward] = sorted_array[:, current] - positions_array[:, forward] = positions_array[:, current] - maxxes[forward] = maxxes[current] - lengths[forward] = lengths[current] - # share half the load from left to left+1 - forward = np.uint64(posn + 1) - current = np.uint64(posn) - maxxes[forward] = sorted_array[-1, current] - lengths[forward] = load_factor - sorted_array[:load_factor, forward] = sorted_array[ - load_factor:, current - ] - positions_array[:load_factor, forward] = positions_array[ - load_factor:, current - ] - lengths[current] = load_factor - maxxes[current] = sorted_array[ - np.uint64(load_factor - 1), current - ] - maxes_counter += 1 + ( + sorted_array, + positions_array, + lengths, + maxxes, + maxxes_counter, + ) = _expand_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + lengths=lengths, + maxxes=maxxes, + posn=posn, + maxxes_counter=maxxes_counter, + load_factor=load_factor, + ) # now we do a binary search # for left region in right region l_region = left_regions[_indexer, 0] - arr = maxxes[:maxes_counter] + arr = maxxes[:maxxes_counter] posn = _numba_less_than(arr=arr, value=l_region) if posn == -1: end = start @@ -1402,7 +1420,7 @@ def _numba_non_equi_join_not_monotonic_keep_all( left_indices[begin_] = l_index right_indices[begin_] = r_index begin += 1 - for ind in range(posn + 1, maxes_counter): + for ind in range(posn + 1, maxxes_counter): ind_ = np.uint64(ind) len_arr = lengths[ind_] for num in range(len_arr): @@ -1453,7 +1471,7 @@ def _numba_non_equi_join_not_monotonic_keep_first( region = right_regions[np.uint64(end), 0] sorted_array[0, 0] = region positions_array[0, 0] = end - maxes_counter = 1 + maxxes_counter = 1 maxxes[0] = region lengths[0] = 1 r_count = 0 @@ -1466,10 +1484,10 @@ def _numba_non_equi_join_not_monotonic_keep_first( for num in range(start, end): _num = np.uint64(num) region = right_regions[_num, 0] - arr = maxxes[:maxes_counter] + arr = maxxes[:maxxes_counter] posn = _numba_less_than(arr=arr, value=region) if posn == -1: - posn = maxes_counter - 1 + posn = maxxes_counter - 1 posn_ = np.uint64(posn) len_arr = lengths[posn_] len_arr_ = np.uint64(len_arr) @@ -1478,55 +1496,42 @@ def _numba_non_equi_join_not_monotonic_keep_first( maxxes[posn_] = region lengths[posn_] += 1 else: - # the sorted array is an adaptation - # of grantjenks' sortedcontainers - posn_ = np.uint64(posn) - len_arr = lengths[posn_] - arr = sorted_array[:len_arr, posn_] - insort_posn = _numba_less_than(arr=arr, value=region) - # shift downwards before inserting - for ind in range(len_arr - 1, insort_posn - 1, -1): - ind_ = np.uint64(ind) - _ind = np.uint64(ind + 1) - sorted_array[_ind, posn_] = sorted_array[ind_, posn_] - positions_array[_ind, posn_] = positions_array[ind_, posn_] - insort = np.uint64(insort_posn) - sorted_array[insort, posn_] = region - positions_array[insort, posn_] = num - lengths[posn_] += 1 - maxxes[posn_] = sorted_array[np.uint64(len_arr), posn_] + sorted_array, positions_array, lengths, maxxes = ( + _numba_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + maxxes=maxxes, + lengths=lengths, + region=region, + posn=posn, + num=num, + ) + ) r_count += 1 posn_ = np.uint64(posn) + # have we exceeded the size of this column? + # do we need to trim and move data to other columns? check = (lengths[posn_] == (load_factor * 2)) & ( r_count < right_index.size ) if check: - # shift from left+1 to right - for pos in range(maxes_counter - 1, posn, -1): - forward = np.uint64(pos + 1) - current = np.uint64(pos) - sorted_array[:, forward] = sorted_array[:, current] - positions_array[:, forward] = positions_array[:, current] - maxxes[forward] = maxxes[current] - lengths[forward] = lengths[current] - # share half the load from left to left+1 - forward = np.uint64(posn + 1) - current = np.uint64(posn) - maxxes[forward] = sorted_array[-1, current] - lengths[forward] = load_factor - sorted_array[:load_factor, forward] = sorted_array[ - load_factor:, current - ] - positions_array[:load_factor, forward] = positions_array[ - load_factor:, current - ] - lengths[current] = load_factor - maxxes[current] = sorted_array[ - np.uint64(load_factor - 1), current - ] - maxes_counter += 1 + ( + sorted_array, + positions_array, + lengths, + maxxes, + maxxes_counter, + ) = _expand_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + lengths=lengths, + maxxes=maxxes, + posn=posn, + maxxes_counter=maxxes_counter, + load_factor=load_factor, + ) l_region = left_regions[_indexer, 0] - arr = maxxes[:maxes_counter] + arr = maxxes[:maxxes_counter] posn = _numba_less_than(arr=arr, value=l_region) if posn == -1: end = start @@ -1560,7 +1565,7 @@ def _numba_non_equi_join_not_monotonic_keep_first( elif r_index < base_index: base_index = r_index # step into the remaining columns - for ind in range(posn + 1, maxes_counter): + for ind in range(posn + 1, maxxes_counter): ind_ = np.uint64(ind) len_arr = lengths[ind_] # step into the rows for each column @@ -1633,7 +1638,7 @@ def _numba_non_equi_join_not_monotonic_keep_last( region = right_regions[np.uint64(end), 0] sorted_array[0, 0] = region positions_array[0, 0] = end - maxes_counter = 1 + maxxes_counter = 1 maxxes[0] = region lengths[0] = 1 r_count = 0 @@ -1646,10 +1651,10 @@ def _numba_non_equi_join_not_monotonic_keep_last( for num in range(start, end): _num = np.uint64(num) region = right_regions[_num, 0] - arr = maxxes[:maxes_counter] + arr = maxxes[:maxxes_counter] posn = _numba_less_than(arr=arr, value=region) if posn == -1: - posn = maxes_counter - 1 + posn = maxxes_counter - 1 posn_ = np.uint64(posn) len_arr = lengths[posn_] len_arr_ = np.uint64(len_arr) @@ -1658,55 +1663,42 @@ def _numba_non_equi_join_not_monotonic_keep_last( maxxes[posn_] = region lengths[posn_] += 1 else: - # the sorted array is an adaptation - # of grantjenks' sortedcontainers - posn_ = np.uint64(posn) - len_arr = lengths[posn_] - arr = sorted_array[:len_arr, posn_] - insort_posn = _numba_less_than(arr=arr, value=region) - # shift downwards before inserting - for ind in range(len_arr - 1, insort_posn - 1, -1): - ind_ = np.uint64(ind) - _ind = np.uint64(ind + 1) - sorted_array[_ind, posn_] = sorted_array[ind_, posn_] - positions_array[_ind, posn_] = positions_array[ind_, posn_] - insort = np.uint64(insort_posn) - sorted_array[insort, posn_] = region - positions_array[insort, posn_] = num - lengths[posn_] += 1 - maxxes[posn_] = sorted_array[np.uint64(len_arr), posn_] + sorted_array, positions_array, lengths, maxxes = ( + _numba_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + maxxes=maxxes, + lengths=lengths, + region=region, + posn=posn, + num=num, + ) + ) r_count += 1 posn_ = np.uint64(posn) + # have we exceeded the size of this column? + # do we need to trim and move data to other columns? check = (lengths[posn_] == (load_factor * 2)) & ( r_count < right_index.size ) if check: - # shift from left+1 to right - for pos in range(maxes_counter - 1, posn, -1): - forward = np.uint64(pos + 1) - current = np.uint64(pos) - sorted_array[:, forward] = sorted_array[:, current] - positions_array[:, forward] = positions_array[:, current] - maxxes[forward] = maxxes[current] - lengths[forward] = lengths[current] - # share half the load from left to left+1 - forward = np.uint64(posn + 1) - current = np.uint64(posn) - maxxes[forward] = sorted_array[-1, current] - lengths[forward] = load_factor - sorted_array[:load_factor, forward] = sorted_array[ - load_factor:, current - ] - positions_array[:load_factor, forward] = positions_array[ - load_factor:, current - ] - lengths[current] = load_factor - maxxes[current] = sorted_array[ - np.uint64(load_factor - 1), current - ] - maxes_counter += 1 + ( + sorted_array, + positions_array, + lengths, + maxxes, + maxxes_counter, + ) = _expand_sorted_array( + sorted_array=sorted_array, + positions_array=positions_array, + lengths=lengths, + maxxes=maxxes, + posn=posn, + maxxes_counter=maxxes_counter, + load_factor=load_factor, + ) l_region = left_regions[_indexer, 0] - arr = maxxes[:maxes_counter] + arr = maxxes[:maxxes_counter] posn = _numba_less_than(arr=arr, value=l_region) if posn == -1: end = start @@ -1740,7 +1732,7 @@ def _numba_non_equi_join_not_monotonic_keep_last( elif r_index > base_index: base_index = r_index # step into the remaining columns - for ind in range(posn + 1, maxes_counter): + for ind in range(posn + 1, maxxes_counter): ind_ = np.uint64(ind) len_arr = lengths[ind_] # step into the rows for each column @@ -1791,7 +1783,74 @@ def _numba_non_equi_join_not_monotonic_keep_last( @njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_keep_first( +def _numba_non_equi_join_monotonic_decreasing_keep_all( + left_regions: np.ndarray, + right_regions: np.ndarray, + left_index: np.ndarray, + right_index: np.ndarray, + starts: np.ndarray, + ends: np.ndarray, +): + """ + Get indices for a non equi join. + """ + length = left_index.size + total = 0 + l_booleans = np.zeros(length, dtype=np.bool_) + # first pass - get actual length + for ind in prange(length): + _ind = np.uint64(ind) + start = starts[_ind] + end = ends[_ind] + for num in range(start, end): + _num = np.uint64(num) + counter = 1 + for loc in range(right_regions.shape[1]): + loc_ = np.uint64(loc) + next_left = left_regions[_ind, loc_] + next_right = right_regions[_num, loc_] + if next_left > next_right: + counter = 0 + break + if counter == 0: + continue + total += 1 + l_booleans[_ind] = True + if total == 0: + return None, None + n = 0 + left_indices = np.empty(total, dtype=np.intp) + right_indices = np.empty(total, dtype=np.intp) + # second pass - fill in values + for ind in range(length): + _ind = np.uint64(ind) + if not l_booleans[_ind]: + continue + start = starts[_ind] + end = ends[_ind] + lindex = left_index[_ind] + for num in range(start, end): + _num = np.uint64(num) + counter = 1 + for loc in range(right_regions.shape[1]): + loc_ = np.uint64(loc) + next_left = left_regions[_ind, loc_] + next_right = right_regions[_num, loc_] + if next_left > next_right: + counter = 0 + break + if counter == 0: + continue + rindex = right_index[_num] + _n = np.uint64(n) + left_indices[_n] = lindex + right_indices[_n] = rindex + n += 1 + return left_indices, right_indices + + +@njit(cache=True, parallel=True) +def _numba_non_equi_join_monotonic_decreasing_keep_first( left_regions: np.ndarray, right_regions: np.ndarray, left_index: np.ndarray, @@ -1806,6 +1865,7 @@ def _numba_non_equi_join_monotonic_keep_first( total = 0 l_booleans = np.zeros(length, dtype=np.bool_) r_indices = np.empty(length, dtype=np.intp) + # first pass - get actual length for ind in prange(length): _ind = np.uint64(ind) start = starts[_ind] @@ -1840,6 +1900,7 @@ def _numba_non_equi_join_monotonic_keep_first( n = 0 left_indices = np.empty(total, dtype=np.intp) right_indices = np.empty(total, dtype=np.intp) + # second pass - fill in values for ind in prange(length): _ind = np.uint64(ind) if not l_booleans[_ind]: @@ -1852,24 +1913,26 @@ def _numba_non_equi_join_monotonic_keep_first( @njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_increasing_keep_first( +def _numba_non_equi_join_monotonic_decreasing_keep_last( left_regions: np.ndarray, right_regions: np.ndarray, left_index: np.ndarray, right_index: np.ndarray, starts: np.ndarray, + ends: np.ndarray, ): """ - Get indices for a non equi join - first match + Get indices for a non equi join - last match. """ length = left_index.size - end = len(right_regions) total = 0 l_booleans = np.zeros(length, dtype=np.bool_) r_indices = np.empty(length, dtype=np.intp) + # first pass - get actual length for ind in prange(length): _ind = np.uint64(ind) start = starts[_ind] + end = ends[_ind] matches = 0 base = -1 for num in range(start, end): @@ -1888,7 +1951,7 @@ def _numba_non_equi_join_monotonic_increasing_keep_first( if matches == 0: base = rindex matches = 1 - elif rindex < base: + elif rindex > base: base = rindex if matches == 0: continue @@ -1900,6 +1963,7 @@ def _numba_non_equi_join_monotonic_increasing_keep_first( n = 0 left_indices = np.empty(total, dtype=np.intp) right_indices = np.empty(total, dtype=np.intp) + # second pass - fill in values for ind in prange(length): _ind = np.uint64(ind) if not l_booleans[_ind]: @@ -1912,25 +1976,25 @@ def _numba_non_equi_join_monotonic_increasing_keep_first( @njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_keep_last( +def _numba_non_equi_join_monotonic_increasing_keep_first( left_regions: np.ndarray, right_regions: np.ndarray, left_index: np.ndarray, right_index: np.ndarray, starts: np.ndarray, - ends: np.ndarray, ): """ - Get indices for a non equi join - last match. + Get indices for a non equi join - first match """ length = left_index.size + end = len(right_regions) total = 0 l_booleans = np.zeros(length, dtype=np.bool_) r_indices = np.empty(length, dtype=np.intp) + # first pass - get actual length for ind in prange(length): _ind = np.uint64(ind) start = starts[_ind] - end = ends[_ind] matches = 0 base = -1 for num in range(start, end): @@ -1949,7 +2013,7 @@ def _numba_non_equi_join_monotonic_keep_last( if matches == 0: base = rindex matches = 1 - elif rindex > base: + elif rindex < base: base = rindex if matches == 0: continue @@ -1961,6 +2025,7 @@ def _numba_non_equi_join_monotonic_keep_last( n = 0 left_indices = np.empty(total, dtype=np.intp) right_indices = np.empty(total, dtype=np.intp) + # second pass - fill in actual values for ind in prange(length): _ind = np.uint64(ind) if not l_booleans[_ind]: @@ -1988,6 +2053,7 @@ def _numba_non_equi_join_monotonic_increasing_keep_last( total = 0 l_booleans = np.zeros(length, dtype=np.bool_) r_indices = np.empty(length, dtype=np.intp) + # first pass - get actual length for ind in prange(length): _ind = np.uint64(ind) start = starts[_ind] @@ -2021,6 +2087,7 @@ def _numba_non_equi_join_monotonic_increasing_keep_last( n = 0 left_indices = np.empty(total, dtype=np.intp) right_indices = np.empty(total, dtype=np.intp) + # second pass - fill in values for ind in prange(length): _ind = np.uint64(ind) if not l_booleans[_ind]: @@ -2033,31 +2100,67 @@ def _numba_non_equi_join_monotonic_increasing_keep_last( @njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_keep_all_dual( +def _numba_non_equi_join_monotonic_increasing_keep_all( + left_regions: np.ndarray, + right_regions: np.ndarray, left_index: np.ndarray, right_index: np.ndarray, starts: np.ndarray, - ends: np.ndarray, - start_indices: np.ndarray, - left_indices: np.ndarray, - right_indices: np.ndarray, ): """ - Get indices for a dual non equi join + Get indices for a non equi join. """ - for ind in prange(left_index.size): + length = left_index.size + end = len(right_regions) + total = 0 + l_booleans = np.zeros(length, dtype=np.bool_) + # first pass - get actual length + for ind in prange(length): _ind = np.uint64(ind) start = starts[_ind] - end = ends[_ind] - indexer = start_indices[_ind] + for num in range(start, end): + _num = np.uint64(num) + counter = 1 + for loc in range(right_regions.shape[1]): + loc_ = np.uint64(loc) + next_left = left_regions[_ind, loc_] + next_right = right_regions[_num, loc_] + if next_left > next_right: + counter = 0 + break + if counter == 0: + continue + total += 1 + l_booleans[_ind] = True + if total == 0: + return None, None + n = 0 + left_indices = np.empty(total, dtype=np.intp) + right_indices = np.empty(total, dtype=np.intp) + # second pass - fill in values + for ind in range(length): + _ind = np.uint64(ind) + if not l_booleans[_ind]: + continue + start = starts[_ind] lindex = left_index[_ind] for num in range(start, end): _num = np.uint64(num) + counter = 1 + for loc in range(right_regions.shape[1]): + loc_ = np.uint64(loc) + next_left = left_regions[_ind, loc_] + next_right = right_regions[_num, loc_] + if next_left > next_right: + counter = 0 + break + if counter == 0: + continue rindex = right_index[_num] - _indexer = np.uint64(indexer) - left_indices[_indexer] = lindex - right_indices[_indexer] = rindex - indexer += 1 + _n = np.uint64(n) + left_indices[_n] = lindex + right_indices[_n] = rindex + n += 1 return left_indices, right_indices @@ -2090,21 +2193,20 @@ def _numba_non_equi_join_monotonic_increasing_keep_all_dual( @njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_keep_first_dual( +def _numba_non_equi_join_monotonic_increasing_keep_first_dual( left_index: np.ndarray, right_index: np.ndarray, starts: np.ndarray, - ends: np.ndarray, left_indices: np.ndarray, right_indices: np.ndarray, ): """ Get indices for a dual non equi join """ + end = right_index.size for ind in prange(left_index.size): _ind = np.uint64(ind) start = starts[_ind] - end = ends[_ind] lindex = left_index[_ind] base_index = right_index[np.uint64(start)] for num in range(start, end): @@ -2118,7 +2220,7 @@ def _numba_non_equi_join_monotonic_keep_first_dual( @njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_increasing_keep_first_dual( +def _numba_non_equi_join_monotonic_increasing_keep_last_dual( left_index: np.ndarray, right_index: np.ndarray, starts: np.ndarray, @@ -2137,7 +2239,7 @@ def _numba_non_equi_join_monotonic_increasing_keep_first_dual( for num in range(start, end): _num = np.uint64(num) rindex = right_index[_num] - if rindex < base_index: + if rindex > base_index: base_index = rindex left_indices[_ind] = lindex right_indices[_ind] = base_index @@ -2145,11 +2247,12 @@ def _numba_non_equi_join_monotonic_increasing_keep_first_dual( @njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_keep_last_dual( +def _numba_non_equi_join_monotonic_decreasing_keep_all_dual( left_index: np.ndarray, right_index: np.ndarray, starts: np.ndarray, ends: np.ndarray, + start_indices: np.ndarray, left_indices: np.ndarray, right_indices: np.ndarray, ): @@ -2160,39 +2263,40 @@ def _numba_non_equi_join_monotonic_keep_last_dual( _ind = np.uint64(ind) start = starts[_ind] end = ends[_ind] + indexer = start_indices[_ind] lindex = left_index[_ind] - base_index = right_index[np.uint64(start)] for num in range(start, end): _num = np.uint64(num) rindex = right_index[_num] - if rindex > base_index: - base_index = rindex - left_indices[_ind] = lindex - right_indices[_ind] = base_index + _indexer = np.uint64(indexer) + left_indices[_indexer] = lindex + right_indices[_indexer] = rindex + indexer += 1 return left_indices, right_indices @njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_increasing_keep_last_dual( +def _numba_non_equi_join_monotonic_decreasing_keep_first_dual( left_index: np.ndarray, right_index: np.ndarray, starts: np.ndarray, + ends: np.ndarray, left_indices: np.ndarray, right_indices: np.ndarray, ): """ Get indices for a dual non equi join """ - end = right_index.size for ind in prange(left_index.size): _ind = np.uint64(ind) start = starts[_ind] + end = ends[_ind] lindex = left_index[_ind] base_index = right_index[np.uint64(start)] for num in range(start, end): _num = np.uint64(num) rindex = right_index[_num] - if rindex > base_index: + if rindex < base_index: base_index = rindex left_indices[_ind] = lindex right_indices[_ind] = base_index @@ -2200,128 +2304,119 @@ def _numba_non_equi_join_monotonic_increasing_keep_last_dual( @njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_keep_all( - left_regions: np.ndarray, - right_regions: np.ndarray, +def _numba_non_equi_join_monotonic_decreasing_keep_last_dual( left_index: np.ndarray, right_index: np.ndarray, starts: np.ndarray, ends: np.ndarray, + left_indices: np.ndarray, + right_indices: np.ndarray, ): """ - Get indices for a non equi join. + Get indices for a dual non equi join """ - length = left_index.size - total = 0 - l_booleans = np.zeros(length, dtype=np.bool_) - for ind in prange(length): + for ind in prange(left_index.size): _ind = np.uint64(ind) start = starts[_ind] end = ends[_ind] - for num in range(start, end): - _num = np.uint64(num) - counter = 1 - for loc in range(right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_ind, loc_] - next_right = right_regions[_num, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - total += 1 - l_booleans[_ind] = True - if total == 0: - return None, None - n = 0 - left_indices = np.empty(total, dtype=np.intp) - right_indices = np.empty(total, dtype=np.intp) - for ind in range(length): - _ind = np.uint64(ind) - if not l_booleans[_ind]: - continue - start = starts[_ind] - end = ends[_ind] lindex = left_index[_ind] + base_index = right_index[np.uint64(start)] for num in range(start, end): _num = np.uint64(num) - counter = 1 - for loc in range(right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_ind, loc_] - next_right = right_regions[_num, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue rindex = right_index[_num] - _n = np.uint64(n) - left_indices[_n] = lindex - right_indices[_n] = rindex - n += 1 + if rindex > base_index: + base_index = rindex + left_indices[_ind] = lindex + right_indices[_ind] = base_index return left_indices, right_indices -@njit(cache=True, parallel=True) -def _numba_non_equi_join_monotonic_increasing_keep_all( - left_regions: np.ndarray, - right_regions: np.ndarray, - left_index: np.ndarray, - right_index: np.ndarray, - starts: np.ndarray, +@njit +def _numba_sorted_array( + sorted_array: np.ndarray, + positions_array: np.ndarray, + maxxes: np.ndarray, + lengths: np.ndarray, + region: int, + posn: int, + num: int, +) -> tuple: + """ + Adaptation of grantjenk's sortedcontainers. + + Args: + sorted_array: array of regions to keep in sorted order. + positions_array: positions of regions in the sorted_array. + maxxes: array of max values per column in the sorted_array. + lengths: array of lengths per column in the sorted_array. + region: integer to insert into sorted_array. + posn: binary search position of region in maxxes array. + Determines which column in the sorted_array + the region will go to. + num: position of region in right_regions array. + Inserted into positions_array to keep + in sync with the region the sorted_array. + """ + # the sorted array is an adaptation + # of grantjenks' sortedcontainers + posn_ = np.uint64(posn) + len_arr = lengths[posn_] + # grab the specific column that the region falls into + arr = sorted_array[:len_arr, posn_] + # get the insertion position for the region + insort_posn = _numba_less_than(arr=arr, value=region) + # make space for the region + # shift downwards before inserting + # shift in this order to avoid issues with assignment override + # which could create wrong values + for ind in range(len_arr - 1, insort_posn - 1, -1): + ind_ = np.uint64(ind) + _ind = np.uint64(ind + 1) + sorted_array[_ind, posn_] = sorted_array[ind_, posn_] + positions_array[_ind, posn_] = positions_array[ind_, posn_] + # now we can safely insert the region + insort = np.uint64(insort_posn) + sorted_array[insort, posn_] = region + positions_array[insort, posn_] = num + # update the length and the maxxes arrays + lengths[posn_] += 1 + maxxes[posn_] = sorted_array[np.uint64(len_arr), posn_] + return sorted_array, positions_array, lengths, maxxes + + +@njit +def _expand_sorted_array( + sorted_array: np.ndarray, + positions_array: np.ndarray, + lengths: np.ndarray, + maxxes: np.ndarray, + posn: int, + maxxes_counter: int, + load_factor: int, ): """ - Get indices for a non equi join. + Expand sorted_array if it exceeds load_factor * 2 + Adapted from grantjenks' sortedcontainers. """ - length = left_index.size - end = len(right_regions) - total = 0 - l_booleans = np.zeros(length, dtype=np.bool_) - for ind in prange(length): - _ind = np.uint64(ind) - start = starts[_ind] - for num in range(start, end): - _num = np.uint64(num) - counter = 1 - for loc in range(right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_ind, loc_] - next_right = right_regions[_num, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - total += 1 - l_booleans[_ind] = True - if total == 0: - return None, None - n = 0 - left_indices = np.empty(total, dtype=np.intp) - right_indices = np.empty(total, dtype=np.intp) - for ind in range(length): - _ind = np.uint64(ind) - if not l_booleans[_ind]: - continue - start = starts[_ind] - lindex = left_index[_ind] - for num in range(start, end): - _num = np.uint64(num) - counter = 1 - for loc in range(right_regions.shape[1]): - loc_ = np.uint64(loc) - next_left = left_regions[_ind, loc_] - next_right = right_regions[_num, loc_] - if next_left > next_right: - counter = 0 - break - if counter == 0: - continue - rindex = right_index[_num] - _n = np.uint64(n) - left_indices[_n] = lindex - right_indices[_n] = rindex - n += 1 - return left_indices, right_indices + # shift from left+1 to right + for pos in range(maxxes_counter - 1, posn, -1): + forward = np.uint64(pos + 1) + current = np.uint64(pos) + sorted_array[:, forward] = sorted_array[:, current] + positions_array[:, forward] = positions_array[:, current] + maxxes[forward] = maxxes[current] + lengths[forward] = lengths[current] + # share half the load from left to left+1 + forward = np.uint64(posn + 1) + current = np.uint64(posn) + maxxes[forward] = sorted_array[-1, current] + lengths[forward] = load_factor + sorted_array[:load_factor, forward] = sorted_array[load_factor:, current] + positions_array[:load_factor, forward] = positions_array[ + load_factor:, current + ] + # update the length and maxxes arrays + lengths[current] = load_factor + maxxes[current] = sorted_array[np.uint64(load_factor - 1), current] + maxxes_counter += 1 + return sorted_array, positions_array, lengths, maxxes, maxxes_counter