I have tried to change almost all the params tf.Dataset offered,but all my effort seems no useful.Loading data is my bottleneck.
Here show my code and other details.
def feture_name(column, suffix_name="_f", reverse=False):
if reverse:
result = column[:-len(suffix_name)]
else:
result = column + suffix_name
return result
def time_predict_feature_config_from_data_meta(data_meta: pd.DataFrame, past_length, future_length, retain=False):
feature_config = {
"context": {},
"sequence": {},
"serialize": {},
}
time_steps = past_length + future_length
for column_info in data_meta.itertuples():
if column_info.time_type in ("static", "future_known", "future_unknown"):
if column_info.time_type == "static":
shape = []
elif column_info.time_type == "future_known":
shape = [time_steps]
elif column_info.time_type == "future_unknown":
shape = [past_length]
if column_info.feature_type == "dense":
shape.append(1)
dtype = "float"
else:
dtype = "int"
feature_catagory = "context"
# if len(shape) == 2:
# feature_catagory = "sequence"
feature_config[feature_catagory].update(
{feture_name(column_info.column_name):
{
"shape": shape,
"dtype": dtype
}
}
)
if retain and column_info.retain == "1":
feature_config["context"].update(
{column_info.column_name:
{
"shape": [],
"dtype": column_info.data_type
}
}
)
if column_info.label == "1":
feature_config["context"].update(
{column_info.column_name:
{
"shape": [future_length, 1],
"dtype": column_info.data_type
}
}
)
return feature_config
class TFRecordPipeLine():
def __init__(
self,
feature_config: dict = {},
batch_size:int = 8,
):
self.batch_size = batch_size
self.feature_config = feature_config
self.context_description, self.sequence_description = self.feature_config_decoder(feature_config)
@staticmethod
def data_type_transform(dtype: str, data='tf'):
if dtype.startswith("int"):
return tf.int64
elif dtype.startswith("float"):
return tf.float32
elif dtype.startswith("string"):
return tf.string
else:
raise ValueError(f"only accept [int, float, string], but get {dtype}.")
@staticmethod
def feature_config_decoder(feature_config: dict):
"""
Feature describe configure decoder
:param feature_config: feature_config is a dict object.
e.g.
{
"context": {
"x1": {"shape": [1], "dtype": "float"},
"x2": {"shape": [2], "dtype": "int"},
"x3": {"shape": [3], "dtype": "string"}
},
"sequence": {
"s1": {"dtype": "float"},
"s2": {"dtype": "int"}
},
"serialize": {
"d1": {"shape": [32, 32], "dtype": "float"},
"d2": {"shape": [24, 24], "dtype": "int"}
}
}
:return: context_description & sequence_description
"""
# Example
context_description = dict()
# SequenceExample
sequence_description = dict()
context_feature_info = feature_config.get("context")
if context_feature_info is not None:
for key, value in context_feature_info.items():
_shape, _type = value.get("shape"), value.get("dtype")
_data_type = TFRecordPipeLine.data_type_transform(dtype=_type, data='tf')
if value.get("default_val") is None:
_def_val = None
else:
_def_val = tf.constant(value.get("default_val"), dtype=_data_type, shape=_shape)
if _shape is None:
context_description[key] = tf.io.VarLenFeature(
dtype=_data_type
)
else:
context_description[key] = tf.io.FixedLenFeature(
shape=_shape, dtype=_data_type, default_value=_def_val
)
sequence_feature_info = feature_config.get("sequence")
if sequence_feature_info is not None:
for key, value in sequence_feature_info.items():
_data_type = TFRecordPipeLine.data_type_transform(dtype=value.get("dtype"), data='tf')
_shape = value.get("shape")
if _shape is None:
sequence_description[key] = tf.io.VarLenFeature(
dtype=_data_type
)
else:
if len(_shape) < 2:
raise ValueError(f"You must provide more than 2 dims shape when choose `sequence` feature, "
f"but got {_shape}.")
if value.get("default_val") is not None:
raise ValueError(f"`default_val` is not support when choose `shape` in `sequence` feature, "
f"feature: {key}.")
sequence_description[key] = tf.io.FixedLenSequenceFeature(
dtype=_data_type,
shape=_shape[1:]
)
serialize_feature_info = feature_config.get("serialize")
if serialize_feature_info is not None:
for key, value in serialize_feature_info.items():
if value.get("default_val") is None:
_def_val = None
else:
_def_val = tf.constant(
value.get("default_val"),
dtype=value.get("dtype"),
shape=value.get("shape")
).numpy().tobytes()
context_description[key] = tf.io.FixedLenFeature(shape=[1], dtype=tf.string, default_value=_def_val)
return context_description, sequence_description
@staticmethod
def parse_example(proto, context_description: dict, sequence_description: dict,
feature_config: dict, batch_size: int = 8):
"""
:param proto: The protobuf serialize data.
:param context_description: The example feature describe.
:param sequence_description: The sequence example feature describe.
:param feature_config: The total feature configure.
:param batch_size: The size of the batch.
:return: aim label feature & aim variable feature.
"""
_context_feature, _sequence_feature, _ = tf.io.parse_sequence_example(
proto,
context_features=context_description,
sequence_features=sequence_description
)
for key, value in _sequence_feature.items():
_shape = feature_config.get("sequence").get(key).get("shape")
_def_val = feature_config.get("sequence").get(key).get("default_val")
if _shape is None:
_sequence_feature[key] = tf.sparse.to_dense(value, default_value=_def_val)
if feature_config.get("serialize") is not None:
for key in feature_config.get("serialize").keys():
_val = _context_feature[key]
_origin_dtype = TFRecordPipeLine.data_type_transform(
feature_config.get("serialize").get(key).get("dtype"),
data="tf"
)
_origin_shape = feature_config.get("serialize").get(key).get("shape")
_decode_val = tf.io.decode_raw(_val, _origin_dtype)
_decode_val = tf.reshape(_decode_val, [batch_size] + _origin_shape)
_context_feature[key] = _decode_val
if feature_config.get("context") is not None:
for key in feature_config.get("context").keys():
_shape = feature_config.get("context").get(key).get("shape")
if _shape is None:
_val = _context_feature[key]
_def_val = feature_config.get("context").get(key).get("default_val")
_context_feature[key] = tf.sparse.to_dense(_val, default_value=_def_val)
_feature = {**_context_feature, **_sequence_feature}
return _feature
def process(self, file_path, **kwargs):
parse_function = tf.function(
partial(self.parse_example,
context_description=self.context_description,
sequence_description=self.sequence_description,
feature_config=self.feature_config,
batch_size=self.batch_size)
)
tfrecord_file = self.local_path_decode(file_path=file_path)
# dataset = tf.data.TFRecordDataset(
# filenames=tfrecord_file,
# compression_type=kwargs.get("compression_type"),
# buffer_size=kwargs.get("buffer_size"),
# num_parallel_reads=kwargs.get("num_parallel_reads", tf.data.AUTOTUNE)
# )
# dataset = tf.data.Dataset.list_files(tfrecord_file, shuffle=kwargs.get("is_train", False))
dataset = tf.data.Dataset.from_tensor_slices(tfrecord_file)
if kwargs.get("is_train", False):
dataset = dataset.shuffle(
buffer_size=kwargs.get("file_shuffle_size", 4),
seed=kwargs.get("shuffle_seed"),
reshuffle_each_iteration=kwargs.get("file_reshuffle_each_iteration", False)
)
# num_shards = kwargs.get("num_shards", 4)
# def make_dataset(shard_index, dataset=dataset):
# dataset = dataset.shard(num_shards, shard_index)
# dataset = tf.data.TFRecordDataset(dataset,
# compression_type=kwargs.get("compression_type"),
# num_parallel_reads=kwargs.get("num_parallel_reads")
# )
# return dataset
# indices = tf.data.Dataset.range(num_shards)
# dataset = indices.interleave(make_dataset,
# num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.interleave(
lambda x: tf.data.TFRecordDataset(x,
compression_type=kwargs.get("compression_type"),
num_parallel_reads=kwargs.get("num_parallel_reads", tf.data.AUTOTUNE),
),
cycle_length=kwargs.get("cycle_length"),
num_parallel_calls=kwargs.get("num_parallel_calls", tf.data.AUTOTUNE),
deterministic=kwargs.get("interleave_deterministic", True)
)
dataset = dataset.batch(
batch_size=kwargs.get("dummy_batch", self.batch_size),
num_parallel_calls=kwargs.get("num_parallel_calls", tf.data.AUTOTUNE),
deterministic=kwargs.get("batch_deterministic", True)
)
dataset = dataset.map(
parse_function,
num_parallel_calls=kwargs.get("num_parallel_calls", tf.data.AUTOTUNE),
deterministic=kwargs.get("map_deterministic", True)
)
if kwargs.get("cache", False):
dataset = dataset.cache(filename=kwargs.get("cache_file", ""))
if kwargs.get("shuffle_size"):
dataset = dataset.shuffle(
buffer_size=kwargs.get("shuffle_size"),
seed=kwargs.get("shuffle_seed"),
reshuffle_each_iteration=kwargs.get("reshuffle_each_iteration", False)
)
# dataset = dataset.rebatch(self.batch_size)
# dataset = dataset.unbatch()
# dataset = dataset.batch(self.batch_size)
dataset = dataset.prefetch(kwargs.get("prefetch_size", tf.data.AUTOTUNE))
if kwargs.get("option", False):
# 创建一个 tf.data.Options 对象,不用option会有一些问题
# 比如deterministic 都为False会没有结果
# num_parallel_calls都自动好像也会没结果
# 不过使用了option,没法batch再rebatch
options = tf.data.Options()
# # 设置选项:启用并行处理,但确保确定性
# options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
options.threading.max_intra_op_parallelism = kwargs.get("max_intra_op_parallelism", 1)
options.threading.private_threadpool_size = kwargs.get("private_threadpool_size", 4)
options.deterministic = kwargs.get("deterministic", True) # 关闭确定性以提高性能
dataset = dataset.with_options(options)
return dataset
@staticmethod
def local_path_decode(file_path):
path_status = os.path.isdir(file_path)
file_status = os.path.isfile(file_path)
local_path = os.path.abspath(file_path)
local_list = list()
if not (file_status | path_status):
raise ValueError(f"local_path: `{file_path}` is invalid.")
if file_status:
local_list.append(local_path)
else:
file_list = os.listdir(local_path)
for items in file_list:
if items.startswith('_') | items.startswith('~') | items.startswith('.'):
continue
new_path = os.path.join(local_path, items)
if os.path.isfile(new_path):
local_list.append(new_path)
return local_list
import datetime
feature_config = time_predict_feature_config_from_data_meta(data_meta, past_length=93, future_length=62)
tp_t = TFRecordPipeLine(feature_config, batch_size=2**14)
train_dataset = tp_t.process(
os.path.join(seq_l2, "tp_type=train"),
compression_type="GZIP",
num_parallel_reads=16,
# num_parallel_calls=32,
# buffer_size=1024*1024*32,
# shuffle_size=4,
prefetch_size=3,
deterministic=False,
option=True,
max_intra_op_parallelism=64,
private_threadpool_size=32,
is_train=True,
)
init_start = datetime.datetime.now()
start = datetime.datetime.now()
for i, batch in enumerate(train_dataset):
end = datetime.datetime.now()
# print(batch)
print(end-start)
start = end
if i == 99:
break
print("100 batch total use time:", end-init_start)
MY CPU's usage is just about 10%,each batch need almost 0.5s to 1s,which is too slow.
0:00:03.311197
0:00:00.155027
0:00:00.290817
0:00:00.860181
0:00:00.594197
0:00:00.523642
0:00:00.982845
0:00:01.098462
0:00:01.007237
0:00:01.214216
0:00:01.069441
0:00:00.555538
0:00:01.148678
0:00:01.417670
0:00:00.513149
0:00:00.769899
0:00:00.917635
0:00:01.288421
0:00:00.880603
0:00:01.094904
0:00:00.973094
0:00:00.787760
0:00:01.365806
0:00:02.136255
0:00:00.089710
...
0:00:00.364601
0:00:01.825834
0:00:00.003376
100 batch total use time: 0:02:20.718250
When I train my model which is a LSTM model, GPU is usage usually down to 0%.I think that time GPU is wait the data from CPU. enter image description here
I have tried to increase num_parallel_calls,max_intra_op_parallelism,private_threadpool_size,buffer_size and so on,but it all not useful.Why? Even I increase the param,my CPU's usage is still about 10%.I know that num_parallel_calls to high will case thread conflicts. So,what can I do?