Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial support for concurency #2

Draft
wants to merge 14 commits into
base: ea/stateful
Choose a base branch
from
6 changes: 4 additions & 2 deletions optimum/intel/openvino/modeling_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(
self.output_names = output_names

self.model = model
self.compiled_model = None
self.request = None
if enable_compilation:
self.compile()
Expand Down Expand Up @@ -341,15 +342,15 @@ def _to_load(
)

def compile(self):
if self.request is None:
if self.compiled_model is None:
logger.info(f"Compiling the model to {self._device} ...")
ov_config = {**self.ov_config}
if "CACHE_DIR" not in self.ov_config.keys() and not str(self.model_save_dir).startswith(gettempdir()):
# Set default CACHE_DIR only if it is not set, and if the model is not in a temporary directory
cache_dir = Path(self.model_save_dir).joinpath("model_cache")
ov_config["CACHE_DIR"] = str(cache_dir)
logger.info(f"Setting OpenVINO CACHE_DIR to {str(cache_dir)}")
self.request = core.compile_model(self.model, self._device, ov_config)
self.compiled_model = core.compile_model(self.model, self._device, ov_config)

def _reshape(
self,
Expand Down Expand Up @@ -388,6 +389,7 @@ def reshape(self, batch_size: int, sequence_length: int, height: int = None, wid
self.is_dynamic = True if batch_size == -1 and sequence_length == -1 else False
self.model = self._reshape(self.model, batch_size, sequence_length, height, width)
self.request = None
self.compiled_model = None
return self

def half(self):
Expand Down
40 changes: 28 additions & 12 deletions optimum/intel/openvino/modeling_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from .modeling import _TOKENIZER_FOR_DOC, INPUTS_DOCSTRING, MODEL_START_DOCSTRING, OVModel
from .utils import ONNX_WEIGHTS_NAME, OV_XML_FILE_NAME, STR_TO_OV_TYPE


if is_transformers_version("<", "4.25.0"):
from transformers.generation_utils import GenerationMixin
else:
Expand Down Expand Up @@ -307,9 +306,8 @@ def reshape(self, batch_size: int, sequence_length: int):
return self

def compile(self):
if self.request is None:
if self.compiled_model is None:
super().compile()
self.request = self.request.create_infer_request()

def _make_stateful(self):
patch_stateful(self.config, self.model)
Expand All @@ -335,9 +333,21 @@ class OVModelForCausalLM(OVBaseDecoderModel, GenerationMixin):
checkpoint="gpt2",
)
)

def create_infer_request(self):
if self.compiled_model is None:
super().compile()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just call self.compile()

return self.compiled_model.create_infer_request()

def generate(self, *args, **kwargs):
if kwargs.get("infer_request") is None:
kwargs["infer_request"] = self.create_infer_request()
return super().generate(*args, **kwargs)

def forward(
self,
input_ids: torch.LongTensor,
infer_request: openvino.runtime.InferRequest,
Copy link
Collaborator

@slyalin slyalin Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it truly backward compatible to add another positional argument to forward that is exposed externally and can be potentially used in some not so trivial examples bypassing generate method? Would it be safer to add it to kwargs instead?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, the safest way would be to pass it as keyword-only argument. It is more descriptive and clear approach than kwargs.

attention_mask: Optional[torch.LongTensor] = None,
past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
position_ids: Optional[torch.LongTensor] = None,
Expand Down Expand Up @@ -399,10 +409,10 @@ def forward(
# It should be something that is not None and it should be True when converted to Boolean.
past_key_values = ((),)
# This is the first iteration in a sequence, reset all states
if hasattr(self.request, "reset_state"):
self.request.reset_state()
if hasattr(infer_request, "reset_state"):
infer_request.reset_state()
else:
for state in self.request.query_state():
for state in infer_request.query_state():
state.reset()
# Set initial value for the next beam_idx input that will be used at the current iteration
# and will be optionally updated by _reorder_cache at the next iterations if beam_search is used
Expand Down Expand Up @@ -436,30 +446,29 @@ def forward(
inputs["beam_idx"] = self.next_beam_idx
Copy link
Collaborator

@slyalin slyalin Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most critical comment: self.next_beam_idx is a shared resource that is updated concurrently from multiple threads. It will lead to incorrect behavior when multiple generates with different batch size or with beam search mode and different prompts, or with any kind of sampling are called. Separate version of next_beam_idx should be created for each generate invocation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed it is failing with parallel generate with different batch sizes. I'll try passing next_beam_idx also in kwargs along with infer_request.


# Run inference
self.request.start_async(inputs, share_inputs=True)
self.request.wait()
logits = torch.from_numpy(self.request.get_tensor("logits").data).to(self.device)
infer_request.start_async(inputs, share_inputs=True)
infer_request.wait()
logits = torch.from_numpy(infer_request.get_tensor("logits").data).to(self.device)

if not self.stateful:
if self.use_cache:
# Tuple of length equal to : number of layer * number of past_key_value per decoder layer (2 corresponds to the self-attention layer)
past_key_values = tuple(self.request.get_tensor(key).data for key in self.key_value_output_names)
past_key_values = tuple(infer_request.get_tensor(key).data for key in self.key_value_output_names)
if self.config.model_type not in MULTI_QUERY_ATTN_MODELS:
# Tuple of tuple of length `n_layers`, with each tuple of length equal to 2 (k/v of self-attention)
past_key_values = tuple(
past_key_values[i : i + self.num_pkv] for i in range(0, len(past_key_values), self.num_pkv)
)
else:
past_key_values = None

return CausalLMOutputWithPast(logits=logits, past_key_values=past_key_values)

# Adapted from transformers.models.gpt2.modeling_gpt2.GPT2LMHeadModel.prepare_inputs_for_generation
def prepare_inputs_for_generation(self, input_ids, past_key_values=None, **kwargs):
# if model is used as a decoder in encoder-decoder model, the decoder attention mask is created on the fly
attention_mask = kwargs.get("attention_mask", None)
use_cache = kwargs.get("use_cache", None)

infer_request = kwargs.get("infer_request", None)
position_ids = kwargs.get("position_ids", None)
if attention_mask is not None and position_ids is None:
# create position_ids on the fly for batch generation
Expand All @@ -474,6 +483,7 @@ def prepare_inputs_for_generation(self, input_ids, past_key_values=None, **kwarg
"use_cache": use_cache,
"position_ids": position_ids,
"attention_mask": attention_mask,
"infer_request": infer_request,
}

# Adapted from transformers.models.gpt2.modeling_gpt2.GPT2LMHeadModel._reorder_cache
Expand Down Expand Up @@ -552,6 +562,7 @@ class OVBloomForCausalLM(OVModelForCausalLM):
def prepare_inputs_for_generation(self, input_ids, past_key_values=None, **kwargs):
attention_mask = kwargs.get("attention_mask", None)
use_cache = kwargs.get("use_cache", None)
infer_request = kwargs.get("infer_request", None)

# only last token for input_ids if past is not None
if past_key_values and not self.stateful:
Expand All @@ -565,6 +576,7 @@ def prepare_inputs_for_generation(self, input_ids, past_key_values=None, **kwarg
"use_cache": use_cache,
"position_ids": None,
"attention_mask": attention_mask,
"infer_request" : infer_request,
}

# Adapted from transformers.models.bloom.modeling_bloom.BloomForCausalLM._reorder_cache
Expand Down Expand Up @@ -637,13 +649,15 @@ class OVOPTForCausalLM(OVModelForCausalLM):
def prepare_inputs_for_generation(self, input_ids, past_key_values=None, **kwargs):
attention_mask = kwargs.get("attention_mask", None)
use_cache = kwargs.get("use_cache", None)
infer_request = kwargs.get("infer_request", None)

return {
"input_ids": input_ids,
"past_key_values": past_key_values,
"use_cache": use_cache,
"position_ids": None,
"attention_mask": attention_mask,
"infer_request": infer_request,
}


Expand All @@ -652,13 +666,15 @@ class OVMPTForCausalLM(OVModelForCausalLM):
def prepare_inputs_for_generation(self, input_ids, past_key_values=None, **kwargs):
attention_mask = kwargs.get("attention_mask", None)
use_cache = kwargs.get("use_cache", None)
infer_request = kwargs.get("infer_request", None)

return {
"input_ids": input_ids,
"past_key_values": past_key_values,
"use_cache": use_cache,
"position_ids": None,
"attention_mask": attention_mask,
"infer_request": infer_request,
}


Expand Down
44 changes: 44 additions & 0 deletions tests/openvino/gen_threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from optimum.intel import OVModelForCausalLM
from transformers import AutoTokenizer, AutoConfig
import threading


#model_path = "/home/devuser/openvino.genai/llm_bench/python/mistral-int8-new-stateful/pytorch/dldt/compressed_weights/OV_FP16-INT8/"
#model_path = "/home/devuser/openvino.genai/llm_bench/python/mistral-int8-new/pytorch/dldt/compressed_weights/OV_FP16-INT8"
#model_path = "/home/devuser/openvino.genai/llm_bench/python/llama-2-7b-chat-hf-stateful/pytorch/dldt/compressed_weights/OV_FP16-INT8/"
model_path = "/home/devuser/openvino.genai/llm_bench/python/llama-2-7b-chat-hf-stateful/pytorch/dldt/compressed_weights/OV_FP16-INT8/"
#model_path = "/home/devuser/openvino.genai/llm_bench/python/llama-2-7b-chat-hf/pytorch/dldt/FP16/"


OV_CONFIG = {'PERFORMANCE_HINT': 'LATENCY', 'CACHE_DIR': '','NUM_STREAMS': '1'}
model = OVModelForCausalLM.from_pretrained(model_path, config=AutoConfig.from_pretrained(model_path, trust_remote_code=True),stateful=True,ov_config=OV_CONFIG)
tokenizer = AutoTokenizer.from_pretrained(model_path)

def gen_thread(prompt):
inputs = tokenizer(prompt, return_tensors="pt")
generate_kwargs = dict(
input_ids=inputs.input_ids,
max_new_tokens=50,
temperature=1.0,
do_sample=True,
top_p=1.0,
top_k=50,
repetition_penalty=1.1,
use_cache=False
)
outputs = model.generate(**generate_kwargs)
print(tokenizer.decode(outputs[0], skip_special_tokens=True))

prompt1 = [" The weather is "]
x = threading.Thread(target=gen_thread, args=(prompt1,))
x.start()
prompt2 = [" Openvino is a "]
y = threading.Thread(target=gen_thread, args=(prompt2,))
y.start()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use more stressful test with beam search that would trigger a race on beam_idx model object field update. Or run multiple generates with different batch sizes.


x.join()
y.join()