From ebc7792001874f9493634643fe6e628ce818711c Mon Sep 17 00:00:00 2001 From: Dmitry Kurtaev Date: Thu, 24 Sep 2020 15:08:21 +0000 Subject: [PATCH 1/8] Intel OpenVINO backend --- README.md | 17 +++- bonito/cli/basecaller.py | 2 + bonito/cli/evaluate.py | 3 +- bonito/crf/basecall.py | 5 +- bonito/crf/model.py | 67 ++++++++++++++- bonito/ctc/basecall.py | 3 +- bonito/openvino/loader.py | 31 +++++++ bonito/openvino/model.py | 166 ++++++++++++++++++++++++++++++++++++++ bonito/util.py | 15 ++-- setup.py | 3 + 10 files changed, 296 insertions(+), 16 deletions(-) create mode 100644 bonito/openvino/loader.py create mode 100644 bonito/openvino/model.py diff --git a/README.md b/README.md index 77a9c273..87aa26f9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Bonito -[![PyPI version](https://badge.fury.io/py/ont-bonito.svg)](https://badge.fury.io/py/ont-bonito) +[![PyPI version](https://badge.fury.io/py/ont-bonito.svg)](https://badge.fury.io/py/ont-bonito) [![py36](https://img.shields.io/badge/python-3.6-brightgreen.svg)](https://img.shields.io/badge/python-3.6-brightgreen.svg) [![py37](https://img.shields.io/badge/python-3.7-brightgreen.svg)](https://img.shields.io/badge/python-3.7-brightgreen.svg) [![py38](https://img.shields.io/badge/python-3.8-brightgreen.svg)](https://img.shields.io/badge/python-3.8-brightgreen.svg) @@ -35,6 +35,12 @@ The default `ont-bonito` package is built against CUDA 10.2 however CUDA 11.1 an $ pip install -f https://download.pytorch.org/whl/torch_stable.html ont-bonito-cuda111 ``` +To optimize inference on CPU with Intel OpenVINO use `--use_openvino` flag: + +```bash +$ bonito basecaller dna_r9.4.1 --reference reference.mmi --use_openvino --device=cpu /data/reads > basecalls.sam +``` + ## Modified Bases Modified base calling is handled by [Remora](https://github.com/nanoporetech/remora). @@ -54,7 +60,7 @@ $ bonito basecaller dna_r9.4.1 --save-ctc --reference reference.mmi /data/reads $ bonito train --directory /data/training/ctc-data /data/training/model-dir ``` -In addition to training a new model from scratch you can also easily fine tune one of the pretrained models. +In addition to training a new model from scratch you can also easily fine tune one of the pretrained models. ```bash bonito train --epochs 1 --lr 5e-4 --pretrained dna_r10.4_e8.1_sup@v3.4 --directory /data/training/ctc-data /data/training/fine-tuned-model @@ -67,7 +73,7 @@ $ bonito download --training $ bonito train /data/training/model-dir ``` -All training calls use Automatic Mixed Precision to speed up training. To disable this, set the `--no-amp` flag to True. +All training calls use Automatic Mixed Precision to speed up training. To disable this, set the `--no-amp` flag to True. ## Developer Quickstart @@ -81,6 +87,11 @@ $ source venv3/bin/activate (venv3) $ python setup.py develop ``` +To build with OpenVINO backend: +```bash +(venv3) $ pip install develop .[openvino] +``` + ## Interface - `bonito view` - view a model architecture for a given `.toml` file and the number of parameters in the network. diff --git a/bonito/cli/basecaller.py b/bonito/cli/basecaller.py index d2e7bdf4..58833649 100644 --- a/bonito/cli/basecaller.py +++ b/bonito/cli/basecaller.py @@ -40,6 +40,7 @@ def main(args): batchsize=args.batchsize, quantize=args.quantize, use_koi=True, + use_openvino=args.use_openvino, ) except FileNotFoundError: sys.stderr.write(f"> error: failed to load {args.model_directory}\n") @@ -172,4 +173,5 @@ def argparser(): parser.add_argument("--batchsize", default=None, type=int) parser.add_argument("--max-reads", default=0, type=int) parser.add_argument('-v', '--verbose', action='count', default=0) + parser.add_argument("--use_openvino", action="store_true", default=False) return parser diff --git a/bonito/cli/evaluate.py b/bonito/cli/evaluate.py index c81775c3..4d568793 100644 --- a/bonito/cli/evaluate.py +++ b/bonito/cli/evaluate.py @@ -45,7 +45,7 @@ def main(args): seqs = [] print("* loading model", w) - model = load_model(args.model_directory, args.device, weights=w) + model = load_model(args.model_directory, args.device, weights=w, use_openvino=args.use_openvino) print("* calling") t0 = time.perf_counter() @@ -109,4 +109,5 @@ def argparser(): parser.add_argument("--beamsize", default=5, type=int) parser.add_argument("--poa", action="store_true", default=False) parser.add_argument("--min-coverage", default=0.5, type=float) + parser.add_argument("--use_openvino", action="store_true", default=False) return parser diff --git a/bonito/crf/basecall.py b/bonito/crf/basecall.py index 2d037f4a..7abae212 100644 --- a/bonito/crf/basecall.py +++ b/bonito/crf/basecall.py @@ -28,12 +28,13 @@ def compute_scores(model, batch, beam_width=32, beam_cut=100.0, scale=1.0, offse """ with torch.inference_mode(): device = next(model.parameters()).device - dtype = torch.float16 if half_supported() else torch.float32 + dtype = torch.float16 if device != torch.device('cpu') and half_supported() else torch.float32 scores = model(batch.to(dtype).to(device)) if reverse: scores = model.seqdist.reverse_complement(scores) + # beam_search expects scores in FP16 precision sequence, qstring, moves = beam_search( - scores, beam_width=beam_width, beam_cut=beam_cut, + scores.to(torch.float16), beam_width=beam_width, beam_cut=beam_cut, scale=scale, offset=offset, blank_score=blank_score ) return { diff --git a/bonito/crf/model.py b/bonito/crf/model.py index 615dd432..10359600 100644 --- a/bonito/crf/model.py +++ b/bonito/crf/model.py @@ -6,8 +6,9 @@ import numpy as np from bonito.nn import Module, Convolution, LinearCRFEncoder, Serial, Permute, layers, from_dict -import seqdist.sparse -from seqdist.ctc_simple import logZ_cupy, viterbi_alignments +if torch.cuda.is_available(): + import seqdist.sparse + from seqdist.ctc_simple import logZ_cupy, viterbi_alignments from seqdist.core import SequenceDist, Max, Log, semiring @@ -21,6 +22,58 @@ def get_stride(m): return 1 +def logZ_fwd_cpu(Ms, idx, v0, vT, S): + T, N, C, NZ = Ms.shape + Ms_grad = torch.zeros(T, N, C, NZ) + + a = v0 + for t in range(T): + s = S.mul(a[:, idx], Ms[t]) + a = S.sum(s, -1) + Ms_grad[t] = s + return S.sum(a + vT, dim=1), Ms_grad + + +def logZ_bwd_cpu(Ms, idx, vT, S, K=1): + assert(K == 1) + T, N, C, NZ = Ms.shape + Ms = Ms.reshape(T, N, -1) + idx_T = idx.flatten().argsort().to(dtype=torch.long).reshape(C, NZ) + + betas = torch.ones(T + 1, N, C) + + a = vT + betas[T] = a + for t in reversed(range(T)): + s = S.mul(a[:, idx_T // NZ], Ms[t, :, idx_T]) + a = S.sum(s, -1) + betas[t] = a + return betas + + +class _LogZ(torch.autograd.Function): + @staticmethod + def forward(ctx, Ms, idx, v0, vT, S:semiring): + idx = idx.to(dtype=torch.long, device=Ms.device) + logZ, Ms_grad = logZ_fwd_cpu(Ms, idx, v0, vT, S) + ctx.save_for_backward(Ms_grad, Ms, idx, vT) + ctx.semiring = S + return logZ + + @staticmethod + def backward(ctx, grad): + Ms_grad, Ms, idx, vT = ctx.saved_tensors + S = ctx.semiring + T, N, C, NZ = Ms.shape + betas = logZ_bwd_cpu(Ms, idx, vT, S) + Ms_grad = S.mul(Ms_grad, betas[1:,:,:,None]) + Ms_grad = S.dsum(Ms_grad.reshape(T, N, -1), dim=2).reshape(T, N, C, NZ) + return grad[None, :, None, None] * Ms_grad, None, None, None, None, None + +def sparse_logZ(Ms, idx, v0, vT, S:semiring=Log): + return _LogZ.apply(Ms, idx, v0, vT, S) + + class CTC_CRF(SequenceDist): def __init__(self, state_len, alphabet): @@ -43,7 +96,10 @@ def logZ(self, scores, S:semiring=Log): Ms = scores.reshape(T, N, -1, len(self.alphabet)) alpha_0 = Ms.new_full((N, self.n_base**(self.state_len)), S.one) beta_T = Ms.new_full((N, self.n_base**(self.state_len)), S.one) - return seqdist.sparse.logZ(Ms, self.idx, alpha_0, beta_T, S) + if not Ms.device.index is None: + return seqdist.sparse.logZ(Ms, self.idx, alpha_0, beta_T, S) + else: + return sparse_logZ(Ms, self.idx, alpha_0, beta_T, S) def normalise(self, scores): return (scores - self.logZ(scores)[:, None] / len(scores)) @@ -58,7 +114,10 @@ def backward_scores(self, scores, S: semiring=Log): T, N, _ = scores.shape Ms = scores.reshape(T, N, -1, self.n_base + 1) beta_T = Ms.new_full((N, self.n_base**(self.state_len)), S.one) - return seqdist.sparse.bwd_scores_cupy(Ms, self.idx, beta_T, S, K=1) + if not Ms.device.index is None: + return seqdist.sparse.bwd_scores_cupy(Ms, self.idx, beta_T, S, K=1) + else: + return logZ_bwd_cpu(Ms, self.idx, beta_T, S, K=1) def compute_transition_probs(self, scores, betas): T, N, C = scores.shape diff --git a/bonito/ctc/basecall.py b/bonito/ctc/basecall.py index 9b6f780a..92692156 100644 --- a/bonito/ctc/basecall.py +++ b/bonito/ctc/basecall.py @@ -35,7 +35,8 @@ def compute_scores(model, batch): """ with torch.no_grad(): device = next(model.parameters()).device - chunks = batch.to(torch.half).to(device) + chunks = batch.to(torch.half) if device != torch.device('cpu') and half_supported() else batch + chunks = chunks.to(device) probs = permute(model(chunks), 'TNC', 'NTC') return probs.cpu().to(torch.float32) diff --git a/bonito/openvino/loader.py b/bonito/openvino/loader.py new file mode 100644 index 00000000..e2ab6bbd --- /dev/null +++ b/bonito/openvino/loader.py @@ -0,0 +1,31 @@ +import torch.nn as nn + + +def convert_to_2d(model): + for name, l in model.named_children(): + layer_type = l.__class__.__name__ + if layer_type == 'Conv1d': + new_layer = nn.Conv2d(l.in_channels, l.out_channels, + (1, l.kernel_size[0]), (1, l.stride[0]), + (0, l.padding[0]), (1, l.dilation[0]), + l.groups, False if l.bias is None else True, l.padding_mode) + params = l.state_dict() + params['weight'] = params['weight'].unsqueeze(2) + new_layer.load_state_dict(params) + setattr(model, name, new_layer) + elif layer_type == 'BatchNorm1d': + new_layer = nn.BatchNorm2d(l.num_features, l.eps) + new_layer.load_state_dict(l.state_dict()) + new_layer.eval() + setattr(model, name, new_layer) + elif layer_type == 'Permute': + dims_2d = [] + # 1D to 2D: i.e. (2, 0, 1) -> (2, 3, 0, 1) + for d in l.dims: + assert(d <= 2) + dims_2d.append(d) + if d == 2: + dims_2d.append(3) + l.dims = dims_2d + else: + convert_to_2d(l) diff --git a/bonito/openvino/model.py b/bonito/openvino/model.py new file mode 100644 index 00000000..b2fa0f7c --- /dev/null +++ b/bonito/openvino/model.py @@ -0,0 +1,166 @@ +import os +import io +import numpy as np +import torch + +try: + from openvino.inference_engine import IECore, StatusCode + from .loader import convert_to_2d +except ImportError: + pass + + +def load_openvino_model(model, dirname): + package = model.config['model']['package'] + if package == 'bonito.ctc': + return OpenVINOCTCModel(model, dirname) + elif package == 'bonito.crf': + return OpenVINOCRFModel(model, dirname) + else: + raise Exception('Unknown model configuration: ' + package) + + +class OpenVINOModel: + + def __init__(self, model, dirname): + self.model = model + self.alphabet = model.alphabet + self.parameters = model.parameters + self.stride = model.stride + self.net = None + self.exec_net = None + self.dirname = dirname + self.ie = IECore() + + + def eval(self): + pass + + + def half(self): + return self + + + @property + def config(self): + return self.model.config + + + def to(self, device): + self.device = str(device).upper() + + """ + Call this method once to initialize executable network + """ + def init_model(self, model, inp_shape): + # First, we try to check if there is IR on disk. If not - load model in runtime + xml_path, bin_path = [os.path.join(self.dirname, 'model') + ext for ext in ['.xml', '.bin']] + if os.path.exists(xml_path) and os.path.exists(bin_path): + self.net = self.ie.read_network(xml_path, bin_path) + else: + # Convert model to ONNX buffer + buf = io.BytesIO() + inp = torch.randn(inp_shape) + torch.onnx.export(model, inp, buf, input_names=['input'], output_names=['output'], + opset_version=11) + + # Import network from memory buffer + self.net = self.ie.read_network(buf.getvalue(), b'', init_from_buffer=True) + + # Load model to device + config = {} + if self.device == 'CPU': + config={'CPU_THROUGHPUT_STREAMS': 'CPU_THROUGHPUT_AUTO'} + self.exec_net = self.ie.load_network(self.net, self.device, + config=config, num_requests=0) + + + def process(self, data): + data = data.float() + batch_size = data.shape[0] + inp_shape = list(data.shape) + inp_shape[0] = 1 # We will run the batch asynchronously + + # List that maps infer requests to index of processed chunk from batch. + # -1 means that request has not been started yet. + infer_request_input_id = [-1] * len(self.exec_net.requests) + out_shape = self.net.outputs['output'].shape + # CTC network produces 1xWxNxC + output = np.zeros([out_shape[-3], batch_size, out_shape[-1]], dtype=np.float32) + + for inp_id in range(batch_size): + # Get idle infer request + infer_request_id = self.exec_net.get_idle_request_id() + if infer_request_id < 0: + status = self.exec_net.wait(num_requests=1) + if status != StatusCode.OK: + raise Exception("Wait for idle request failed!") + infer_request_id = self.exec_net.get_idle_request_id() + if infer_request_id < 0: + raise Exception("Invalid request id!") + + out_id = infer_request_input_id[infer_request_id] + request = self.exec_net.requests[infer_request_id] + + # Copy output prediction + if out_id != -1: + output[:,out_id:out_id+1] = request.output_blobs['output'].buffer + + # Start this request on new data + infer_request_input_id[infer_request_id] = inp_id + request.async_infer({'input': data[inp_id]}) + inp_id += 1 + + # Wait for the rest of requests + status = self.exec_net.wait() + if status != StatusCode.OK: + raise Exception("Wait for idle request failed!") + for infer_request_id, out_id in enumerate(infer_request_input_id): + if out_id == -1: + continue + request = self.exec_net.requests[infer_request_id] + output[:,out_id:out_id+1] = request.output_blobs['output'].buffer + + return torch.tensor(output) + + +class OpenVINOCTCModel(OpenVINOModel): + + def __init__(self, model, dirname): + super().__init__(model, dirname) + + + def __call__(self, data): + data = data.unsqueeze(2) # 1D->2D + if self.exec_net is None: + convert_to_2d(self.model) + self.init_model(self.model, [1, 1, 1, data.shape[-1]]) + + return self.process(data) + + + def decode(self, x, beamsize=5, threshold=1e-3, qscores=False, return_path=False): + return self.model.decode(x, beamsize=beamsize, threshold=threshold, + qscores=qscores, return_path=return_path) + + +class OpenVINOCRFModel(OpenVINOModel): + + def __init__(self, model, dirname): + super().__init__(model, dirname) + self.seqdist = model.seqdist + + + def __call__(self, data): + if self.exec_net is None: + self.init_model(self.model.encoder, [1, 1, data.shape[-1]]) + + return self.process(data) + + + def decode(self, x): + return self.model.decode(x) + + + def decode_batch(self, x): + return self.model.decode_batch(x) diff --git a/bonito/util.py b/bonito/util.py index dc971665..b1fed7a2 100644 --- a/bonito/util.py +++ b/bonito/util.py @@ -19,6 +19,7 @@ import parasail import numpy as np from torch.cuda import get_device_capability +from bonito.openvino.model import load_openvino_model try: from claragenomics.bindings import cuda @@ -46,7 +47,7 @@ def init(seed, device): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) - if device == "cpu": return + if not device.startswith('cuda'): return torch.backends.cudnn.enabled = True torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False @@ -251,7 +252,7 @@ def match_names(state_dict, model): return OrderedDict([(k, remap[k]) for k in state_dict.keys()]) -def load_model(dirname, device, weights=None, half=None, chunksize=None, batchsize=None, overlap=None, quantize=False, use_koi=False): +def load_model(dirname, device, weights=None, half=None, chunksize=None, batchsize=None, overlap=None, quantize=False, use_koi=False, use_openvino=False): """ Load a model from disk """ @@ -264,7 +265,8 @@ def load_model(dirname, device, weights=None, half=None, chunksize=None, batchsi raise FileNotFoundError("no model weights found in '%s'" % dirname) weights = max([int(re.sub(".*_([0-9]+).tar", "\\1", w)) for w in weight_files]) - device = torch.device(device) + if not use_openvino: + device = torch.device(device) config = toml.load(os.path.join(dirname, 'config.toml')) weights = os.path.join(dirname, 'weights_%s.tar' % weights) @@ -285,7 +287,7 @@ def load_model(dirname, device, weights=None, half=None, chunksize=None, batchsi model.encoder, batchsize=batchsize, chunksize=chunksize // model.stride, quantize=quantize ) - state_dict = torch.load(weights, map_location=device) + state_dict = torch.load(weights, map_location=device if not use_openvino else 'cpu') state_dict = {k2: state_dict[k1] for k1, k2 in match_names(state_dict, model).items()} new_state_dict = OrderedDict() for k, v in state_dict.items(): @@ -294,7 +296,10 @@ def load_model(dirname, device, weights=None, half=None, chunksize=None, batchsi model.load_state_dict(new_state_dict) - if half is None: + if use_openvino: + model = load_openvino_model(model, dirname) + + if half is None and device != torch.device('cpu'): half = half_supported() if half: model = model.half() diff --git a/setup.py b/setup.py index 271fabd6..71304378 100644 --- a/setup.py +++ b/setup.py @@ -41,6 +41,9 @@ packages=find_packages(), include_package_data=True, install_requires=requirements, + extras_require = { + 'openvino': ['openvino==2021.4.2'], + }, long_description=long_description, long_description_content_type='text/markdown', author='Oxford Nanopore Technologies, Ltd', From 45a5dd004553575ac498c8c5b0d1048798d06d9c Mon Sep 17 00:00:00 2001 From: Chris Seymour Date: Wed, 26 Jan 2022 16:54:57 +0000 Subject: [PATCH 2/8] full bonito.crf on openvino --- bonito/cli/basecaller.py | 6 +- bonito/crf/model.py | 58 +--------------- bonito/openvino/basecall.py | 47 +++++++++++++ bonito/openvino/loader.py | 31 --------- bonito/openvino/model.py | 131 ++++++++++++++++++++++++++---------- bonito/util.py | 4 +- 6 files changed, 154 insertions(+), 123 deletions(-) create mode 100644 bonito/openvino/basecall.py delete mode 100644 bonito/openvino/loader.py diff --git a/bonito/cli/basecaller.py b/bonito/cli/basecaller.py index d417939c..cb319d7d 100644 --- a/bonito/cli/basecaller.py +++ b/bonito/cli/basecaller.py @@ -12,6 +12,7 @@ from itertools import islice as take from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter +import bonito.openvino.basecall from bonito.aligner import align_map, Aligner from bonito.io import CTCWriter, Writer, biofmt from bonito.mod_util import call_mods, load_mods_model @@ -51,7 +52,10 @@ def main(args): if args.verbose: sys.stderr.write(f"> model basecaller params: {model.config['basecaller']}\n") - basecall = load_symbol(args.model_directory, "basecall") + if args.use_openvino: + basecall = bonito.openvino.basecall.basecall + else: + basecall = load_symbol(args.model_directory, "basecall") mods_model = None if args.modified_base_model is not None or args.modified_bases is not None: diff --git a/bonito/crf/model.py b/bonito/crf/model.py index 9487477f..b918b063 100644 --- a/bonito/crf/model.py +++ b/bonito/crf/model.py @@ -6,9 +6,8 @@ import numpy as np from bonito.nn import Module, Convolution, LinearCRFEncoder, Serial, Permute, layers, from_dict -if torch.cuda.is_available(): - import seqdist.sparse - from seqdist.ctc_simple import logZ_cupy, viterbi_alignments +import seqdist.sparse +from seqdist.ctc_simple import logZ_cupy, viterbi_alignments from seqdist.core import SequenceDist, Max, Log, semiring @@ -22,58 +21,6 @@ def get_stride(m): return 1 -def logZ_fwd_cpu(Ms, idx, v0, vT, S): - T, N, C, NZ = Ms.shape - Ms_grad = torch.zeros(T, N, C, NZ) - - a = v0 - for t in range(T): - s = S.mul(a[:, idx], Ms[t]) - a = S.sum(s, -1) - Ms_grad[t] = s - return S.sum(a + vT, dim=1), Ms_grad - - -def logZ_bwd_cpu(Ms, idx, vT, S, K=1): - assert(K == 1) - T, N, C, NZ = Ms.shape - Ms = Ms.reshape(T, N, -1) - idx_T = idx.flatten().argsort().to(dtype=torch.long).reshape(C, NZ) - - betas = torch.ones(T + 1, N, C) - - a = vT - betas[T] = a - for t in reversed(range(T)): - s = S.mul(a[:, idx_T // NZ], Ms[t, :, idx_T]) - a = S.sum(s, -1) - betas[t] = a - return betas - - -class _LogZ(torch.autograd.Function): - @staticmethod - def forward(ctx, Ms, idx, v0, vT, S:semiring): - idx = idx.to(dtype=torch.long, device=Ms.device) - logZ, Ms_grad = logZ_fwd_cpu(Ms, idx, v0, vT, S) - ctx.save_for_backward(Ms_grad, Ms, idx, vT) - ctx.semiring = S - return logZ - - @staticmethod - def backward(ctx, grad): - Ms_grad, Ms, idx, vT = ctx.saved_tensors - S = ctx.semiring - T, N, C, NZ = Ms.shape - betas = logZ_bwd_cpu(Ms, idx, vT, S) - Ms_grad = S.mul(Ms_grad, betas[1:,:,:,None]) - Ms_grad = S.dsum(Ms_grad.reshape(T, N, -1), dim=2).reshape(T, N, C, NZ) - return grad[None, :, None, None] * Ms_grad, None, None, None, None, None - -def sparse_logZ(Ms, idx, v0, vT, S:semiring=Log): - return _LogZ.apply(Ms, idx, v0, vT, S) - - class CTC_CRF(SequenceDist): def __init__(self, state_len, alphabet): @@ -235,6 +182,7 @@ def decode(self, x): def loss(self, scores, targets, target_lengths, **kwargs): return self.seqdist.ctc_loss(scores.to(torch.float32), targets, target_lengths, **kwargs) + class Model(SeqdistModel): def __init__(self, config): diff --git a/bonito/openvino/basecall.py b/bonito/openvino/basecall.py new file mode 100644 index 00000000..c7f79ff7 --- /dev/null +++ b/bonito/openvino/basecall.py @@ -0,0 +1,47 @@ +import torch +from crf_beam import beam_search +from bonito.crf.basecall import stitch_results +from bonito.multiprocessing import thread_iter, thread_map +from bonito.util import chunk, stitch, batchify, unbatchify + + +def compute_scores(model, batch): + scores = model(batch) + fwd = model.forward_scores(scores) + bwd = model.backward_scores(scores) + posts = torch.softmax(fwd + bwd, dim=-1) + return { + 'scores': scores.transpose(0, 1), + 'bwd': bwd.transpose(0, 1), + 'posts': posts.transpose(0, 1), + } + + +def decode(x, beam_width=32, beam_cut=100.0, scale=1.0, offset=0.0, blank_score=2.0): + sequence, qstring, moves = beam_search(x['scores'], x['bwd'], x['posts']) + return { + 'sequence': sequence, + 'qstring': qstring, + 'moves': moves, + } + + +def basecall(model, reads, chunksize=4000, overlap=100, batchsize=32, reverse=False): + + chunks = thread_iter( + ((read, 0, len(read.signal)), chunk(torch.from_numpy(read.signal), chunksize, overlap)) + for read in reads + ) + + batches = thread_iter(batchify(chunks, batchsize=batchsize)) + + scores = thread_iter( + (read, compute_scores(model, batch)) for read, batch in batches + ) + + results = thread_iter( + (read, stitch_results(scores, end - start, chunksize, overlap, model.stride)) + for ((read, start, end), scores) in unbatchify(scores) + ) + + return thread_map(decode, results, n_thread=48) diff --git a/bonito/openvino/loader.py b/bonito/openvino/loader.py deleted file mode 100644 index e2ab6bbd..00000000 --- a/bonito/openvino/loader.py +++ /dev/null @@ -1,31 +0,0 @@ -import torch.nn as nn - - -def convert_to_2d(model): - for name, l in model.named_children(): - layer_type = l.__class__.__name__ - if layer_type == 'Conv1d': - new_layer = nn.Conv2d(l.in_channels, l.out_channels, - (1, l.kernel_size[0]), (1, l.stride[0]), - (0, l.padding[0]), (1, l.dilation[0]), - l.groups, False if l.bias is None else True, l.padding_mode) - params = l.state_dict() - params['weight'] = params['weight'].unsqueeze(2) - new_layer.load_state_dict(params) - setattr(model, name, new_layer) - elif layer_type == 'BatchNorm1d': - new_layer = nn.BatchNorm2d(l.num_features, l.eps) - new_layer.load_state_dict(l.state_dict()) - new_layer.eval() - setattr(model, name, new_layer) - elif layer_type == 'Permute': - dims_2d = [] - # 1D to 2D: i.e. (2, 0, 1) -> (2, 3, 0, 1) - for d in l.dims: - assert(d <= 2) - dims_2d.append(d) - if d == 2: - dims_2d.append(3) - l.dims = dims_2d - else: - convert_to_2d(l) diff --git a/bonito/openvino/model.py b/bonito/openvino/model.py index b2fa0f7c..34ac4376 100644 --- a/bonito/openvino/model.py +++ b/bonito/openvino/model.py @@ -1,13 +1,12 @@ import os import io -import numpy as np import torch +import numpy as np +from collections import namedtuple +from torch.nn import Conv2d, BatchNorm2d -try: - from openvino.inference_engine import IECore, StatusCode - from .loader import convert_to_2d -except ImportError: - pass +try: from openvino.inference_engine import IECore, StatusCode +except ImportError: pass def load_openvino_model(model, dirname): @@ -20,6 +19,39 @@ def load_openvino_model(model, dirname): raise Exception('Unknown model configuration: ' + package) +def convert_to_2d(model): + for name, l in model.named_children(): + layer_type = l.__class__.__name__ + if layer_type == 'Conv1d': + new_layer = Conv2d( + l.in_channels, l.out_channels, + (1, l.kernel_size[0]), (1, l.stride[0]), + (0, l.padding[0]), (1, l.dilation[0]), + l.groups, False if l.bias is None else True, l.padding_mode + ) + params = l.state_dict() + params['weight'] = params['weight'].unsqueeze(2) + new_layer.load_state_dict(params) + setattr(model, name, new_layer) + elif layer_type == 'BatchNorm1d': + new_layer = BatchNorm2d(l.num_features, l.eps) + new_layer.load_state_dict(l.state_dict()) + new_layer.eval() + setattr(model, name, new_layer) + elif layer_type == 'Permute': + dims_2d = [] + # 1D to 2D: i.e. (2, 0, 1) -> (2, 3, 0, 1) + for d in l.dims: + assert(d <= 2) + dims_2d.append(d) + if d == 2: + dims_2d.append(3) + l.dims = dims_2d + else: + convert_to_2d(l) + + + class OpenVINOModel: def __init__(self, model, dirname): @@ -32,27 +64,23 @@ def __init__(self, model, dirname): self.dirname = dirname self.ie = IECore() - def eval(self): pass - def half(self): return self - @property def config(self): return self.model.config - def to(self, device): self.device = str(device).upper() - """ - Call this method once to initialize executable network - """ def init_model(self, model, inp_shape): + """ + Call this method once to initialize executable network + """ # First, we try to check if there is IR on disk. If not - load model in runtime xml_path, bin_path = [os.path.join(self.dirname, 'model') + ext for ext in ['.xml', '.bin']] if os.path.exists(xml_path) and os.path.exists(bin_path): @@ -61,8 +89,7 @@ def init_model(self, model, inp_shape): # Convert model to ONNX buffer buf = io.BytesIO() inp = torch.randn(inp_shape) - torch.onnx.export(model, inp, buf, input_names=['input'], output_names=['output'], - opset_version=11) + torch.onnx.export(model, inp, buf, input_names=['input'], output_names=['output'], opset_version=11) # Import network from memory buffer self.net = self.ie.read_network(buf.getvalue(), b'', init_from_buffer=True) @@ -71,11 +98,10 @@ def init_model(self, model, inp_shape): config = {} if self.device == 'CPU': config={'CPU_THROUGHPUT_STREAMS': 'CPU_THROUGHPUT_AUTO'} - self.exec_net = self.ie.load_network(self.net, self.device, - config=config, num_requests=0) - + self.exec_net = self.ie.load_network(self.net, self.device, config=config, num_requests=0) def process(self, data): + data = data.float() batch_size = data.shape[0] inp_shape = list(data.shape) @@ -85,6 +111,10 @@ def process(self, data): # -1 means that request has not been started yet. infer_request_input_id = [-1] * len(self.exec_net.requests) out_shape = self.net.outputs['output'].shape + + if len(out_shape) == 3: + out_shape = [1, *out_shape] + # CTC network produces 1xWxNxC output = np.zeros([out_shape[-3], batch_size, out_shape[-1]], dtype=np.float32) @@ -104,7 +134,7 @@ def process(self, data): # Copy output prediction if out_id != -1: - output[:,out_id:out_id+1] = request.output_blobs['output'].buffer + output[:, out_id:out_id + 1] = request.output_blobs['output'].buffer # Start this request on new data infer_request_input_id[infer_request_id] = inp_id @@ -115,11 +145,12 @@ def process(self, data): status = self.exec_net.wait() if status != StatusCode.OK: raise Exception("Wait for idle request failed!") + for infer_request_id, out_id in enumerate(infer_request_input_id): if out_id == -1: continue request = self.exec_net.requests[infer_request_id] - output[:,out_id:out_id+1] = request.output_blobs['output'].buffer + output[:, out_id:out_id + 1] = request.output_blobs['output'].buffer return torch.tensor(output) @@ -129,19 +160,49 @@ class OpenVINOCTCModel(OpenVINOModel): def __init__(self, model, dirname): super().__init__(model, dirname) - def __call__(self, data): data = data.unsqueeze(2) # 1D->2D if self.exec_net is None: convert_to_2d(self.model) self.init_model(self.model, [1, 1, 1, data.shape[-1]]) - return self.process(data) - def decode(self, x, beamsize=5, threshold=1e-3, qscores=False, return_path=False): - return self.model.decode(x, beamsize=beamsize, threshold=threshold, - qscores=qscores, return_path=return_path) + return self.model.decode( + x, beamsize=beamsize, threshold=threshold, qscores=qscores, return_path=return_path + ) + + +semiring = namedtuple('semiring', ('zero', 'one', 'mul', 'sum', 'dsum')) +Log = semiring(zero=-1e38, one=0., mul=torch.add, sum=torch.logsumexp, dsum=torch.softmax) + + +def logZ_fwd_cpu(Ms, idx, v0, S:semiring=Log): + + idx = idx.to(torch.int64) + T, N, C, nz = Ms.shape + alpha = Ms.new_full((T+1, N, C), S.zero) + alpha[0] = v0 + for t in range(T): + alpha[t+1] = S.sum(S.mul(Ms[t], alpha[t, :, idx]), dim=2) + return alpha + + +def logZ_bwd_cpu(Ms, idx, vT, S): + + T, N, C, NZ = Ms.shape + Ms = Ms.reshape(T, N, -1) + idx_T = idx.flatten().argsort().to(dtype=torch.long).reshape(C, NZ) + + betas = torch.ones(T + 1, N, C) + + a = vT + betas[T] = a + for t in reversed(range(T)): + s = S.mul(a[:, idx_T // NZ], Ms[t, :, idx_T]) + a = S.sum(s, dim=-1) + betas[t] = a + return betas class OpenVINOCRFModel(OpenVINOModel): @@ -150,17 +211,19 @@ def __init__(self, model, dirname): super().__init__(model, dirname) self.seqdist = model.seqdist - def __call__(self, data): if self.exec_net is None: self.init_model(self.model.encoder, [1, 1, data.shape[-1]]) - return self.process(data) - - def decode(self, x): - return self.model.decode(x) - - - def decode_batch(self, x): - return self.model.decode_batch(x) + def forward_scores(self, scores, S: semiring=Log): + T, N, _ = scores.shape + Ms = scores.reshape(T, N, -1, self.seqdist.n_base + 1) + v0 = Ms.new_full((N, self.seqdist.n_base**(self.seqdist.state_len)), S.one) + return logZ_fwd_cpu(Ms, self.seqdist.idx, v0, S) + + def backward_scores(self, scores, S: semiring=Log): + T, N, _ = scores.shape + Ms = scores.reshape(T, N, -1, self.seqdist.n_base + 1) + beta_T = Ms.new_full((N, self.seqdist.n_base**(self.seqdist.state_len)), S.one) + return logZ_bwd_cpu(Ms, self.seqdist.idx, beta_T, S) diff --git a/bonito/util.py b/bonito/util.py index 0a43c9be..aeb65813 100644 --- a/bonito/util.py +++ b/bonito/util.py @@ -282,7 +282,7 @@ def load_model(dirname, device, weights=None, half=None, chunksize=None, batchsi Model = load_symbol(config, "Model") model = Model(config) - if use_koi: + if use_koi and device != 'cpu': model.encoder = koi.lstm.update_graph( model.encoder, batchsize=batchsize, chunksize=chunksize // model.stride, quantize=quantize ) @@ -299,7 +299,7 @@ def load_model(dirname, device, weights=None, half=None, chunksize=None, batchsi if use_openvino: model = load_openvino_model(model, dirname) - if half is None and device != torch.device('cpu'): + if half is None and device != 'cpu': half = half_supported() if half: model = model.half() From cb5e5d8473099363cccab0b7e180d2ccce55fbbd Mon Sep 17 00:00:00 2001 From: Chris Seymour Date: Wed, 26 Jan 2022 16:58:44 +0000 Subject: [PATCH 3/8] cpu version in openvino module --- bonito/crf/model.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/bonito/crf/model.py b/bonito/crf/model.py index b918b063..48836a0a 100644 --- a/bonito/crf/model.py +++ b/bonito/crf/model.py @@ -43,10 +43,7 @@ def logZ(self, scores, S:semiring=Log): Ms = scores.reshape(T, N, -1, len(self.alphabet)) alpha_0 = Ms.new_full((N, self.n_base**(self.state_len)), S.one) beta_T = Ms.new_full((N, self.n_base**(self.state_len)), S.one) - if not Ms.device.index is None: - return seqdist.sparse.logZ(Ms, self.idx, alpha_0, beta_T, S) - else: - return sparse_logZ(Ms, self.idx, alpha_0, beta_T, S) + return seqdist.sparse.logZ(Ms, self.idx, alpha_0, beta_T, S) def normalise(self, scores): return (scores - self.logZ(scores)[:, None] / len(scores)) @@ -61,10 +58,7 @@ def backward_scores(self, scores, S: semiring=Log): T, N, _ = scores.shape Ms = scores.reshape(T, N, -1, self.n_base + 1) beta_T = Ms.new_full((N, self.n_base**(self.state_len)), S.one) - if not Ms.device.index is None: - return seqdist.sparse.bwd_scores_cupy(Ms, self.idx, beta_T, S, K=1) - else: - return logZ_bwd_cpu(Ms, self.idx, beta_T, S, K=1) + return seqdist.sparse.bwd_scores_cupy(Ms, self.idx, beta_T, S, K=1) def compute_transition_probs(self, scores, betas): T, N, C = scores.shape From 778c85d7bec16e33f8c4e3d1c79ba6d66e17e72d Mon Sep 17 00:00:00 2001 From: Chris Seymour Date: Mon, 31 Jan 2022 18:26:00 +0000 Subject: [PATCH 4/8] full seqdist interface --- bonito/nn.py | 2 +- bonito/openvino/basecall.py | 4 +- bonito/openvino/model.py | 96 ++++++++++++++++++++++++------------- 3 files changed, 65 insertions(+), 37 deletions(-) diff --git a/bonito/nn.py b/bonito/nn.py index 5c8f1a3d..7c7f6359 100644 --- a/bonito/nn.py +++ b/bonito/nn.py @@ -116,7 +116,7 @@ def forward(self, x): if self.blank_score is not None and self.expand_blanks: T, N, C = scores.shape scores = torch.nn.functional.pad( - scores.view(T, N, C // self.n_base, self.n_base), + scores.view(T, N, -1, self.n_base), (1, 0, 0, 0, 0, 0, 0, 0), value=self.blank_score ).view(T, N, -1) diff --git a/bonito/openvino/basecall.py b/bonito/openvino/basecall.py index c7f79ff7..b477d2f3 100644 --- a/bonito/openvino/basecall.py +++ b/bonito/openvino/basecall.py @@ -7,8 +7,8 @@ def compute_scores(model, batch): scores = model(batch) - fwd = model.forward_scores(scores) - bwd = model.backward_scores(scores) + fwd = model.seqdist.forward_scores(scores) + bwd = model.seqdist.backward_scores(scores) posts = torch.softmax(fwd + bwd, dim=-1) return { 'scores': scores.transpose(0, 1), diff --git a/bonito/openvino/model.py b/bonito/openvino/model.py index 34ac4376..82a2fb3f 100644 --- a/bonito/openvino/model.py +++ b/bonito/openvino/model.py @@ -1,8 +1,9 @@ import os -import io import torch import numpy as np +from io import BytesIO from collections import namedtuple +from bonito.crf.model import CTC_CRF from torch.nn import Conv2d, BatchNorm2d try: from openvino.inference_engine import IECore, StatusCode @@ -51,7 +52,6 @@ def convert_to_2d(model): convert_to_2d(l) - class OpenVINOModel: def __init__(self, model, dirname): @@ -87,7 +87,7 @@ def init_model(self, model, inp_shape): self.net = self.ie.read_network(xml_path, bin_path) else: # Convert model to ONNX buffer - buf = io.BytesIO() + buf = BytesIO() inp = torch.randn(inp_shape) torch.onnx.export(model, inp, buf, input_names=['input'], output_names=['output'], opset_version=11) @@ -173,57 +173,85 @@ def decode(self, x, beamsize=5, threshold=1e-3, qscores=False, return_path=False ) +def grad(f, x): + x = x.detach().requires_grad_() + with torch.enable_grad(): + y = f(x) + return torch.autograd.grad(y, x)[0].detach() + + +def max_grad(x, dim=0): + return torch.zeros_like(x).scatter_(dim, x.argmax(dim, True), 1.0) + + semiring = namedtuple('semiring', ('zero', 'one', 'mul', 'sum', 'dsum')) Log = semiring(zero=-1e38, one=0., mul=torch.add, sum=torch.logsumexp, dsum=torch.softmax) +Max = semiring(zero=-1e38, one=0., mul=torch.add, sum=(lambda x, dim=0: torch.max(x, dim=dim)[0]), dsum=max_grad) -def logZ_fwd_cpu(Ms, idx, v0, S:semiring=Log): - - idx = idx.to(torch.int64) - T, N, C, nz = Ms.shape - alpha = Ms.new_full((T+1, N, C), S.zero) +def scan(Ms, idx, v0, S:semiring=Log): + T, N, C, NZ = Ms.shape + alpha = Ms.new_full((T + 1, N, C), S.zero) alpha[0] = v0 for t in range(T): - alpha[t+1] = S.sum(S.mul(Ms[t], alpha[t, :, idx]), dim=2) + alpha[t+1] = S.sum(S.mul(Ms[t], alpha[t, :, idx]), dim=-1) return alpha -def logZ_bwd_cpu(Ms, idx, vT, S): +class LogZ(torch.autograd.Function): - T, N, C, NZ = Ms.shape - Ms = Ms.reshape(T, N, -1) - idx_T = idx.flatten().argsort().to(dtype=torch.long).reshape(C, NZ) + @staticmethod + def forward(ctx, Ms, idx, v0, vT, scan, S:semiring): + alpha = scan(Ms, idx, v0, S) + ctx.save_for_backward(alpha, Ms, idx, vT) + ctx.semiring, ctx.scan = S, scan + return S.sum(S.mul(alpha[-1], vT), dim=1) + + @staticmethod + def backward(ctx, grad): + alpha, Ms, idx, vT = ctx.saved_tensors + S, scan = ctx.semiring, ctx.scan + T, N, C, NZ = Ms.shape + idx_T = idx.flatten().argsort().reshape(*idx.shape) + Ms_T = Ms.reshape(T, N, -1)[:, :, idx_T] + idx_T = torch.div(idx_T, NZ, rounding_mode='floor') + beta = scan(Ms_T.flip(0), idx_T, vT, S) + g = S.mul(S.mul(Ms.reshape(T, N, -1), alpha[:-1, :, idx.flatten()]).reshape(T, N, C, NZ), beta[:-1, :, :, None].flip(0)) + g = S.dsum(g.reshape(T, N, -1), dim=2).reshape(T, N, C, NZ) + return grad[None, :, None, None] * g, None, None, None, None, None - betas = torch.ones(T + 1, N, C) - a = vT - betas[T] = a - for t in reversed(range(T)): - s = S.mul(a[:, idx_T // NZ], Ms[t, :, idx_T]) - a = S.sum(s, dim=-1) - betas[t] = a - return betas +class CTC_CRF_CPU(CTC_CRF): + + def logZ(self, scores, S:semiring=Log): + T, N, _ = scores.shape + Ms = scores.reshape(T, N, -1, self.n_base + 1) + v0 = Ms.new_full((N, self.n_base**(self.state_len)), S.one) + vT = Ms.new_full((N, self.n_base**(self.state_len)), S.one) + return LogZ.apply(Ms, self.idx.to(torch.int64), v0, vT, scan, S) + + def forward_scores(self, scores, S: semiring=Log): + T, N, _ = scores.shape + Ms = scores.reshape(T, N, -1, self.n_base + 1) + v0 = Ms.new_full((N, self.n_base**(self.state_len)), S.one) + return scan(Ms, self.idx.to(torch.int64), v0, S) + + def backward_scores(self, scores, S: semiring=Log): + T, N, _ = scores.shape + vT = scores.new_full((N, self.n_base**(self.state_len)), S.one) + idx_T = self.idx.flatten().argsort().reshape(*self.idx.shape) + Ms_T = scores[:, :, idx_T] + idx_T = torch.div(idx_T, self.n_base + 1, rounding_mode='floor') + return scan(Ms_T.flip(0), idx_T.to(torch.int64), vT, S).flip(0) class OpenVINOCRFModel(OpenVINOModel): def __init__(self, model, dirname): super().__init__(model, dirname) - self.seqdist = model.seqdist + self.seqdist = CTC_CRF_CPU(model.seqdist.state_len, model.seqdist.alphabet) def __call__(self, data): if self.exec_net is None: self.init_model(self.model.encoder, [1, 1, data.shape[-1]]) return self.process(data) - - def forward_scores(self, scores, S: semiring=Log): - T, N, _ = scores.shape - Ms = scores.reshape(T, N, -1, self.seqdist.n_base + 1) - v0 = Ms.new_full((N, self.seqdist.n_base**(self.seqdist.state_len)), S.one) - return logZ_fwd_cpu(Ms, self.seqdist.idx, v0, S) - - def backward_scores(self, scores, S: semiring=Log): - T, N, _ = scores.shape - Ms = scores.reshape(T, N, -1, self.seqdist.n_base + 1) - beta_T = Ms.new_full((N, self.seqdist.n_base**(self.seqdist.state_len)), S.one) - return logZ_bwd_cpu(Ms, self.seqdist.idx, beta_T, S) From ec54eed3ace91c544e9466463c01e9e0b4bc4dcf Mon Sep 17 00:00:00 2001 From: Dmitry Kurtaev Date: Tue, 15 Mar 2022 12:34:06 +0300 Subject: [PATCH 5/8] Use OpenVINO 2022 with new API and batching --- bonito/openvino/model.py | 83 ++++++++++++++++------------------------ setup.py | 2 +- 2 files changed, 33 insertions(+), 52 deletions(-) diff --git a/bonito/openvino/model.py b/bonito/openvino/model.py index 82a2fb3f..34818a9a 100644 --- a/bonito/openvino/model.py +++ b/bonito/openvino/model.py @@ -1,4 +1,5 @@ import os +from math import ceil import torch import numpy as np from io import BytesIO @@ -6,7 +7,7 @@ from bonito.crf.model import CTC_CRF from torch.nn import Conv2d, BatchNorm2d -try: from openvino.inference_engine import IECore, StatusCode +try: from openvino.runtime import Core, AsyncInferQueue except ImportError: pass @@ -59,10 +60,10 @@ def __init__(self, model, dirname): self.alphabet = model.alphabet self.parameters = model.parameters self.stride = model.stride - self.net = None - self.exec_net = None + self.infer_queue = None self.dirname = dirname - self.ie = IECore() + self.batch_size = 32 + self.ie = Core() def eval(self): pass @@ -84,73 +85,53 @@ def init_model(self, model, inp_shape): # First, we try to check if there is IR on disk. If not - load model in runtime xml_path, bin_path = [os.path.join(self.dirname, 'model') + ext for ext in ['.xml', '.bin']] if os.path.exists(xml_path) and os.path.exists(bin_path): - self.net = self.ie.read_network(xml_path, bin_path) + net = self.ie.read_model(xml_path, bin_path) else: # Convert model to ONNX buffer buf = BytesIO() + inp_shape[0] = self.batch_size inp = torch.randn(inp_shape) torch.onnx.export(model, inp, buf, input_names=['input'], output_names=['output'], opset_version=11) # Import network from memory buffer - self.net = self.ie.read_network(buf.getvalue(), b'', init_from_buffer=True) + net = self.ie.read_model(buf.getvalue(), b'') + + self.output_shape = list(net.outputs[0].get_tensor().shape) # Load model to device - config = {} if self.device == 'CPU': - config={'CPU_THROUGHPUT_STREAMS': 'CPU_THROUGHPUT_AUTO'} - self.exec_net = self.ie.load_network(self.net, self.device, config=config, num_requests=0) + self.ie.set_property('CPU', {'CPU_THROUGHPUT_STREAMS': 'CPU_THROUGHPUT_AUTO', 'CPU_BIND_THREAD': 'YES'}) + + compiled_model = self.ie.compile_model(net, self.device) + num_requests = compiled_model.get_property('OPTIMAL_NUMBER_OF_INFER_REQUESTS') + self.infer_queue = AsyncInferQueue(compiled_model, num_requests) def process(self, data): data = data.float() - batch_size = data.shape[0] - inp_shape = list(data.shape) - inp_shape[0] = 1 # We will run the batch asynchronously + num_samples = data.shape[0] - # List that maps infer requests to index of processed chunk from batch. - # -1 means that request has not been started yet. - infer_request_input_id = [-1] * len(self.exec_net.requests) - out_shape = self.net.outputs['output'].shape + out_shape = self.output_shape if len(out_shape) == 3: - out_shape = [1, *out_shape] + out_shape = [self.batch_size, *out_shape] # CTC network produces 1xWxNxC - output = np.zeros([out_shape[-3], batch_size, out_shape[-1]], dtype=np.float32) - - for inp_id in range(batch_size): - # Get idle infer request - infer_request_id = self.exec_net.get_idle_request_id() - if infer_request_id < 0: - status = self.exec_net.wait(num_requests=1) - if status != StatusCode.OK: - raise Exception("Wait for idle request failed!") - infer_request_id = self.exec_net.get_idle_request_id() - if infer_request_id < 0: - raise Exception("Invalid request id!") - - out_id = infer_request_input_id[infer_request_id] - request = self.exec_net.requests[infer_request_id] - - # Copy output prediction - if out_id != -1: - output[:, out_id:out_id + 1] = request.output_blobs['output'].buffer + output = np.zeros([out_shape[-3], num_samples, out_shape[-1]], dtype=np.float32) - # Start this request on new data - infer_request_input_id[infer_request_id] = inp_id - request.async_infer({'input': data[inp_id]}) - inp_id += 1 + def completion_callback(request, inp_id): + out_i = next(iter(request.results.values())) + output[:, inp_id:inp_id + self.batch_size] = out_i - # Wait for the rest of requests - status = self.exec_net.wait() - if status != StatusCode.OK: - raise Exception("Wait for idle request failed!") + self.infer_queue.set_callback(completion_callback) + + for inp_id in range(ceil(num_samples / self.batch_size)): + # Start this request on new data + inp_id *= self.batch_size + inp_id = min(inp_id, data.shape[0] - self.batch_size) + self.infer_queue.start_async({'input': data[inp_id:inp_id + self.batch_size]}, inp_id) - for infer_request_id, out_id in enumerate(infer_request_input_id): - if out_id == -1: - continue - request = self.exec_net.requests[infer_request_id] - output[:, out_id:out_id + 1] = request.output_blobs['output'].buffer + self.infer_queue.wait_all() return torch.tensor(output) @@ -162,7 +143,7 @@ def __init__(self, model, dirname): def __call__(self, data): data = data.unsqueeze(2) # 1D->2D - if self.exec_net is None: + if self.infer_queue is None: convert_to_2d(self.model) self.init_model(self.model, [1, 1, 1, data.shape[-1]]) return self.process(data) @@ -252,6 +233,6 @@ def __init__(self, model, dirname): self.seqdist = CTC_CRF_CPU(model.seqdist.state_len, model.seqdist.alphabet) def __call__(self, data): - if self.exec_net is None: + if self.infer_queue is None: self.init_model(self.model.encoder, [1, 1, data.shape[-1]]) return self.process(data) diff --git a/setup.py b/setup.py index 71304378..0b6fa926 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ include_package_data=True, install_requires=requirements, extras_require = { - 'openvino': ['openvino==2021.4.2'], + 'openvino': ['openvino~=2022.1.0.dev'], }, long_description=long_description, long_description_content_type='text/markdown', From 49b4a663c788528a42bd969d6d692f5d90a56926 Mon Sep 17 00:00:00 2001 From: Dmitry Kurtaev Date: Tue, 15 Mar 2022 13:08:45 +0300 Subject: [PATCH 6/8] Use batching with OpenVINO --- bonito/openvino/model.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/bonito/openvino/model.py b/bonito/openvino/model.py index 82a2fb3f..86e9aa84 100644 --- a/bonito/openvino/model.py +++ b/bonito/openvino/model.py @@ -1,4 +1,5 @@ import os +from math import ceil import torch import numpy as np from io import BytesIO @@ -62,6 +63,7 @@ def __init__(self, model, dirname): self.net = None self.exec_net = None self.dirname = dirname + self.batch_size = 32 self.ie = IECore() def eval(self): @@ -88,6 +90,7 @@ def init_model(self, model, inp_shape): else: # Convert model to ONNX buffer buf = BytesIO() + inp_shape[0] = self.batch_size inp = torch.randn(inp_shape) torch.onnx.export(model, inp, buf, input_names=['input'], output_names=['output'], opset_version=11) @@ -103,9 +106,7 @@ def init_model(self, model, inp_shape): def process(self, data): data = data.float() - batch_size = data.shape[0] - inp_shape = list(data.shape) - inp_shape[0] = 1 # We will run the batch asynchronously + num_samples = data.shape[0] # List that maps infer requests to index of processed chunk from batch. # -1 means that request has not been started yet. @@ -116,9 +117,9 @@ def process(self, data): out_shape = [1, *out_shape] # CTC network produces 1xWxNxC - output = np.zeros([out_shape[-3], batch_size, out_shape[-1]], dtype=np.float32) + output = np.zeros([out_shape[-3], num_samples, out_shape[-1]], dtype=np.float32) - for inp_id in range(batch_size): + for inp_id in range(ceil(num_samples / self.batch_size)): # Get idle infer request infer_request_id = self.exec_net.get_idle_request_id() if infer_request_id < 0: @@ -134,12 +135,13 @@ def process(self, data): # Copy output prediction if out_id != -1: - output[:, out_id:out_id + 1] = request.output_blobs['output'].buffer + output[:, out_id:out_id + self.batch_size] = request.output_blobs['output'].buffer # Start this request on new data + inp_id *= self.batch_size + inp_id = min(inp_id, data.shape[0] - self.batch_size) infer_request_input_id[infer_request_id] = inp_id - request.async_infer({'input': data[inp_id]}) - inp_id += 1 + request.async_infer({'input': data[inp_id:inp_id + self.batch_size]}) # Wait for the rest of requests status = self.exec_net.wait() @@ -150,7 +152,7 @@ def process(self, data): if out_id == -1: continue request = self.exec_net.requests[infer_request_id] - output[:, out_id:out_id + 1] = request.output_blobs['output'].buffer + output[:, out_id:out_id + self.batch_size] = request.output_blobs['output'].buffer return torch.tensor(output) From a2c9cf9c0d5691b820c674e28a3800ef04cac8b6 Mon Sep 17 00:00:00 2001 From: Chris Seymour Date: Wed, 16 Mar 2022 22:27:35 +0000 Subject: [PATCH 7/8] Revert "Use OpenVINO 2022" --- bonito/openvino/model.py | 83 ++++++++++++++++++++++++---------------- setup.py | 2 +- 2 files changed, 52 insertions(+), 33 deletions(-) diff --git a/bonito/openvino/model.py b/bonito/openvino/model.py index 34818a9a..82a2fb3f 100644 --- a/bonito/openvino/model.py +++ b/bonito/openvino/model.py @@ -1,5 +1,4 @@ import os -from math import ceil import torch import numpy as np from io import BytesIO @@ -7,7 +6,7 @@ from bonito.crf.model import CTC_CRF from torch.nn import Conv2d, BatchNorm2d -try: from openvino.runtime import Core, AsyncInferQueue +try: from openvino.inference_engine import IECore, StatusCode except ImportError: pass @@ -60,10 +59,10 @@ def __init__(self, model, dirname): self.alphabet = model.alphabet self.parameters = model.parameters self.stride = model.stride - self.infer_queue = None + self.net = None + self.exec_net = None self.dirname = dirname - self.batch_size = 32 - self.ie = Core() + self.ie = IECore() def eval(self): pass @@ -85,53 +84,73 @@ def init_model(self, model, inp_shape): # First, we try to check if there is IR on disk. If not - load model in runtime xml_path, bin_path = [os.path.join(self.dirname, 'model') + ext for ext in ['.xml', '.bin']] if os.path.exists(xml_path) and os.path.exists(bin_path): - net = self.ie.read_model(xml_path, bin_path) + self.net = self.ie.read_network(xml_path, bin_path) else: # Convert model to ONNX buffer buf = BytesIO() - inp_shape[0] = self.batch_size inp = torch.randn(inp_shape) torch.onnx.export(model, inp, buf, input_names=['input'], output_names=['output'], opset_version=11) # Import network from memory buffer - net = self.ie.read_model(buf.getvalue(), b'') - - self.output_shape = list(net.outputs[0].get_tensor().shape) + self.net = self.ie.read_network(buf.getvalue(), b'', init_from_buffer=True) # Load model to device + config = {} if self.device == 'CPU': - self.ie.set_property('CPU', {'CPU_THROUGHPUT_STREAMS': 'CPU_THROUGHPUT_AUTO', 'CPU_BIND_THREAD': 'YES'}) - - compiled_model = self.ie.compile_model(net, self.device) - num_requests = compiled_model.get_property('OPTIMAL_NUMBER_OF_INFER_REQUESTS') - self.infer_queue = AsyncInferQueue(compiled_model, num_requests) + config={'CPU_THROUGHPUT_STREAMS': 'CPU_THROUGHPUT_AUTO'} + self.exec_net = self.ie.load_network(self.net, self.device, config=config, num_requests=0) def process(self, data): data = data.float() - num_samples = data.shape[0] + batch_size = data.shape[0] + inp_shape = list(data.shape) + inp_shape[0] = 1 # We will run the batch asynchronously - out_shape = self.output_shape + # List that maps infer requests to index of processed chunk from batch. + # -1 means that request has not been started yet. + infer_request_input_id = [-1] * len(self.exec_net.requests) + out_shape = self.net.outputs['output'].shape if len(out_shape) == 3: - out_shape = [self.batch_size, *out_shape] + out_shape = [1, *out_shape] # CTC network produces 1xWxNxC - output = np.zeros([out_shape[-3], num_samples, out_shape[-1]], dtype=np.float32) - - def completion_callback(request, inp_id): - out_i = next(iter(request.results.values())) - output[:, inp_id:inp_id + self.batch_size] = out_i + output = np.zeros([out_shape[-3], batch_size, out_shape[-1]], dtype=np.float32) + + for inp_id in range(batch_size): + # Get idle infer request + infer_request_id = self.exec_net.get_idle_request_id() + if infer_request_id < 0: + status = self.exec_net.wait(num_requests=1) + if status != StatusCode.OK: + raise Exception("Wait for idle request failed!") + infer_request_id = self.exec_net.get_idle_request_id() + if infer_request_id < 0: + raise Exception("Invalid request id!") + + out_id = infer_request_input_id[infer_request_id] + request = self.exec_net.requests[infer_request_id] + + # Copy output prediction + if out_id != -1: + output[:, out_id:out_id + 1] = request.output_blobs['output'].buffer - self.infer_queue.set_callback(completion_callback) - - for inp_id in range(ceil(num_samples / self.batch_size)): # Start this request on new data - inp_id *= self.batch_size - inp_id = min(inp_id, data.shape[0] - self.batch_size) - self.infer_queue.start_async({'input': data[inp_id:inp_id + self.batch_size]}, inp_id) + infer_request_input_id[infer_request_id] = inp_id + request.async_infer({'input': data[inp_id]}) + inp_id += 1 + + # Wait for the rest of requests + status = self.exec_net.wait() + if status != StatusCode.OK: + raise Exception("Wait for idle request failed!") - self.infer_queue.wait_all() + for infer_request_id, out_id in enumerate(infer_request_input_id): + if out_id == -1: + continue + request = self.exec_net.requests[infer_request_id] + output[:, out_id:out_id + 1] = request.output_blobs['output'].buffer return torch.tensor(output) @@ -143,7 +162,7 @@ def __init__(self, model, dirname): def __call__(self, data): data = data.unsqueeze(2) # 1D->2D - if self.infer_queue is None: + if self.exec_net is None: convert_to_2d(self.model) self.init_model(self.model, [1, 1, 1, data.shape[-1]]) return self.process(data) @@ -233,6 +252,6 @@ def __init__(self, model, dirname): self.seqdist = CTC_CRF_CPU(model.seqdist.state_len, model.seqdist.alphabet) def __call__(self, data): - if self.infer_queue is None: + if self.exec_net is None: self.init_model(self.model.encoder, [1, 1, data.shape[-1]]) return self.process(data) diff --git a/setup.py b/setup.py index 0b6fa926..71304378 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ include_package_data=True, install_requires=requirements, extras_require = { - 'openvino': ['openvino~=2022.1.0.dev'], + 'openvino': ['openvino==2021.4.2'], }, long_description=long_description, long_description_content_type='text/markdown', From 676fdc52ba91b4d8583e0b73140d3e88978f2846 Mon Sep 17 00:00:00 2001 From: Dmitry Kurtaev Date: Tue, 15 Mar 2022 12:34:06 +0300 Subject: [PATCH 8/8] Use OpenVINO 2022 with new API --- bonito/openvino/model.py | 69 ++++++++++++++-------------------------- setup.py | 2 +- 2 files changed, 25 insertions(+), 46 deletions(-) diff --git a/bonito/openvino/model.py b/bonito/openvino/model.py index 86e9aa84..34818a9a 100644 --- a/bonito/openvino/model.py +++ b/bonito/openvino/model.py @@ -7,7 +7,7 @@ from bonito.crf.model import CTC_CRF from torch.nn import Conv2d, BatchNorm2d -try: from openvino.inference_engine import IECore, StatusCode +try: from openvino.runtime import Core, AsyncInferQueue except ImportError: pass @@ -60,11 +60,10 @@ def __init__(self, model, dirname): self.alphabet = model.alphabet self.parameters = model.parameters self.stride = model.stride - self.net = None - self.exec_net = None + self.infer_queue = None self.dirname = dirname self.batch_size = 32 - self.ie = IECore() + self.ie = Core() def eval(self): pass @@ -86,7 +85,7 @@ def init_model(self, model, inp_shape): # First, we try to check if there is IR on disk. If not - load model in runtime xml_path, bin_path = [os.path.join(self.dirname, 'model') + ext for ext in ['.xml', '.bin']] if os.path.exists(xml_path) and os.path.exists(bin_path): - self.net = self.ie.read_network(xml_path, bin_path) + net = self.ie.read_model(xml_path, bin_path) else: # Convert model to ONNX buffer buf = BytesIO() @@ -95,64 +94,44 @@ def init_model(self, model, inp_shape): torch.onnx.export(model, inp, buf, input_names=['input'], output_names=['output'], opset_version=11) # Import network from memory buffer - self.net = self.ie.read_network(buf.getvalue(), b'', init_from_buffer=True) + net = self.ie.read_model(buf.getvalue(), b'') + + self.output_shape = list(net.outputs[0].get_tensor().shape) # Load model to device - config = {} if self.device == 'CPU': - config={'CPU_THROUGHPUT_STREAMS': 'CPU_THROUGHPUT_AUTO'} - self.exec_net = self.ie.load_network(self.net, self.device, config=config, num_requests=0) + self.ie.set_property('CPU', {'CPU_THROUGHPUT_STREAMS': 'CPU_THROUGHPUT_AUTO', 'CPU_BIND_THREAD': 'YES'}) + + compiled_model = self.ie.compile_model(net, self.device) + num_requests = compiled_model.get_property('OPTIMAL_NUMBER_OF_INFER_REQUESTS') + self.infer_queue = AsyncInferQueue(compiled_model, num_requests) def process(self, data): data = data.float() num_samples = data.shape[0] - # List that maps infer requests to index of processed chunk from batch. - # -1 means that request has not been started yet. - infer_request_input_id = [-1] * len(self.exec_net.requests) - out_shape = self.net.outputs['output'].shape + out_shape = self.output_shape if len(out_shape) == 3: - out_shape = [1, *out_shape] + out_shape = [self.batch_size, *out_shape] # CTC network produces 1xWxNxC output = np.zeros([out_shape[-3], num_samples, out_shape[-1]], dtype=np.float32) - for inp_id in range(ceil(num_samples / self.batch_size)): - # Get idle infer request - infer_request_id = self.exec_net.get_idle_request_id() - if infer_request_id < 0: - status = self.exec_net.wait(num_requests=1) - if status != StatusCode.OK: - raise Exception("Wait for idle request failed!") - infer_request_id = self.exec_net.get_idle_request_id() - if infer_request_id < 0: - raise Exception("Invalid request id!") - - out_id = infer_request_input_id[infer_request_id] - request = self.exec_net.requests[infer_request_id] - - # Copy output prediction - if out_id != -1: - output[:, out_id:out_id + self.batch_size] = request.output_blobs['output'].buffer + def completion_callback(request, inp_id): + out_i = next(iter(request.results.values())) + output[:, inp_id:inp_id + self.batch_size] = out_i + self.infer_queue.set_callback(completion_callback) + + for inp_id in range(ceil(num_samples / self.batch_size)): # Start this request on new data inp_id *= self.batch_size inp_id = min(inp_id, data.shape[0] - self.batch_size) - infer_request_input_id[infer_request_id] = inp_id - request.async_infer({'input': data[inp_id:inp_id + self.batch_size]}) - - # Wait for the rest of requests - status = self.exec_net.wait() - if status != StatusCode.OK: - raise Exception("Wait for idle request failed!") + self.infer_queue.start_async({'input': data[inp_id:inp_id + self.batch_size]}, inp_id) - for infer_request_id, out_id in enumerate(infer_request_input_id): - if out_id == -1: - continue - request = self.exec_net.requests[infer_request_id] - output[:, out_id:out_id + self.batch_size] = request.output_blobs['output'].buffer + self.infer_queue.wait_all() return torch.tensor(output) @@ -164,7 +143,7 @@ def __init__(self, model, dirname): def __call__(self, data): data = data.unsqueeze(2) # 1D->2D - if self.exec_net is None: + if self.infer_queue is None: convert_to_2d(self.model) self.init_model(self.model, [1, 1, 1, data.shape[-1]]) return self.process(data) @@ -254,6 +233,6 @@ def __init__(self, model, dirname): self.seqdist = CTC_CRF_CPU(model.seqdist.state_len, model.seqdist.alphabet) def __call__(self, data): - if self.exec_net is None: + if self.infer_queue is None: self.init_model(self.model.encoder, [1, 1, data.shape[-1]]) return self.process(data) diff --git a/setup.py b/setup.py index 71304378..51229ba4 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ include_package_data=True, install_requires=requirements, extras_require = { - 'openvino': ['openvino==2021.4.2'], + 'openvino': ['openvino==2022.1.0'], }, long_description=long_description, long_description_content_type='text/markdown',