0

I have tried to change almost all the params tf.Dataset offered,but all my effort seems no useful.Loading data is my bottleneck.

Here show my code and other details.

def feture_name(column, suffix_name="_f", reverse=False):
    if reverse:
        result = column[:-len(suffix_name)]
    else:
        result = column + suffix_name
    return result

def time_predict_feature_config_from_data_meta(data_meta: pd.DataFrame, past_length, future_length, retain=False):
    feature_config = {
        "context": {},
        "sequence": {},
        "serialize": {},
    }
    time_steps = past_length + future_length
    for column_info in data_meta.itertuples():
        if column_info.time_type in ("static", "future_known", "future_unknown"):
            if column_info.time_type == "static":
                shape = []
            elif column_info.time_type == "future_known":
                shape = [time_steps]
            elif column_info.time_type == "future_unknown":
                shape = [past_length]
            if column_info.feature_type == "dense":
                shape.append(1)
                dtype = "float"
            else:
                dtype = "int"
            feature_catagory = "context"
            # if len(shape) == 2:
            #     feature_catagory = "sequence"
            feature_config[feature_catagory].update(
                {feture_name(column_info.column_name): 
                    {
                        "shape": shape,
                        "dtype": dtype
                    }
                }
            )
        
        if retain and column_info.retain == "1":        
            feature_config["context"].update(
                {column_info.column_name: 
                    {
                        "shape": [],
                        "dtype": column_info.data_type
                    }
                }
            )
        if column_info.label == "1":
            feature_config["context"].update(
                {column_info.column_name: 
                    {
                        "shape": [future_length, 1],
                        "dtype": column_info.data_type
                    }
                }
            )
    return feature_config

class TFRecordPipeLine():

    def __init__(
            self, 
            feature_config: dict = {}, 
            batch_size:int = 8,
        ):
        self.batch_size = batch_size
        self.feature_config = feature_config
        self.context_description, self.sequence_description = self.feature_config_decoder(feature_config)

    @staticmethod
    def data_type_transform(dtype: str, data='tf'):
        if dtype.startswith("int"):
            return tf.int64
        elif dtype.startswith("float"):
            return tf.float32
        elif dtype.startswith("string"):
            return tf.string
        else:
            raise ValueError(f"only accept [int, float, string], but get {dtype}.")

    @staticmethod
    def feature_config_decoder(feature_config: dict):
        """
        Feature describe configure decoder
        :param feature_config: feature_config is a dict object.
            e.g.
            {
                "context": {
                    "x1": {"shape": [1], "dtype": "float"},
                    "x2": {"shape": [2], "dtype": "int"},
                    "x3": {"shape": [3], "dtype": "string"}
                },
                "sequence": {
                    "s1": {"dtype": "float"},
                    "s2": {"dtype": "int"}
                },
                "serialize": {
                    "d1": {"shape": [32, 32], "dtype": "float"},
                    "d2": {"shape": [24, 24], "dtype": "int"}
                }
            }
        :return: context_description & sequence_description
        """

        # Example
        context_description = dict()
        # SequenceExample
        sequence_description = dict()

        context_feature_info = feature_config.get("context")
        if context_feature_info is not None:
            for key, value in context_feature_info.items():
                _shape, _type = value.get("shape"), value.get("dtype")
                _data_type = TFRecordPipeLine.data_type_transform(dtype=_type, data='tf')
                if value.get("default_val") is None:
                    _def_val = None
                else:
                    _def_val = tf.constant(value.get("default_val"), dtype=_data_type, shape=_shape)
                if _shape is None:
                    context_description[key] = tf.io.VarLenFeature(
                        dtype=_data_type
                    )
                else:
                    context_description[key] = tf.io.FixedLenFeature(
                        shape=_shape, dtype=_data_type, default_value=_def_val
                    )

        sequence_feature_info = feature_config.get("sequence")
        if sequence_feature_info is not None:
            for key, value in sequence_feature_info.items():
                _data_type = TFRecordPipeLine.data_type_transform(dtype=value.get("dtype"), data='tf')
                _shape = value.get("shape")
                if _shape is None:
                    sequence_description[key] = tf.io.VarLenFeature(
                        dtype=_data_type
                    )
                else:
                    if len(_shape) < 2:
                        raise ValueError(f"You must provide more than 2 dims shape when choose `sequence` feature, "
                                         f"but got {_shape}.")
                    if value.get("default_val") is not None:
                        raise ValueError(f"`default_val` is not support when choose `shape` in `sequence` feature, "
                                         f"feature: {key}.")
                    sequence_description[key] = tf.io.FixedLenSequenceFeature(
                        dtype=_data_type,
                        shape=_shape[1:]
                    )

        serialize_feature_info = feature_config.get("serialize")
        if serialize_feature_info is not None:
            for key, value in serialize_feature_info.items():
                if value.get("default_val") is None:
                    _def_val = None
                else:
                    _def_val = tf.constant(
                        value.get("default_val"),
                        dtype=value.get("dtype"),
                        shape=value.get("shape")
                    ).numpy().tobytes()
                context_description[key] = tf.io.FixedLenFeature(shape=[1], dtype=tf.string, default_value=_def_val)

        return context_description, sequence_description

    @staticmethod
    def parse_example(proto, context_description: dict, sequence_description: dict,
                      feature_config: dict, batch_size: int = 8):
        """
        :param proto: The protobuf serialize data.
        :param context_description: The example feature describe.
        :param sequence_description: The sequence example feature describe.
        :param feature_config: The total feature configure.
        :param batch_size: The size of the batch.
        :return: aim label feature & aim variable feature.
        """
        _context_feature, _sequence_feature, _ = tf.io.parse_sequence_example(
            proto,
            context_features=context_description,
            sequence_features=sequence_description
        )

        for key, value in _sequence_feature.items():
            _shape = feature_config.get("sequence").get(key).get("shape")
            _def_val = feature_config.get("sequence").get(key).get("default_val")
            if _shape is None:
                _sequence_feature[key] = tf.sparse.to_dense(value, default_value=_def_val)
        if feature_config.get("serialize") is not None:
            for key in feature_config.get("serialize").keys():
                _val = _context_feature[key]
                _origin_dtype = TFRecordPipeLine.data_type_transform(
                    feature_config.get("serialize").get(key).get("dtype"),
                    data="tf"
                )
                _origin_shape = feature_config.get("serialize").get(key).get("shape")
                _decode_val = tf.io.decode_raw(_val, _origin_dtype)
                _decode_val = tf.reshape(_decode_val, [batch_size] + _origin_shape)
                _context_feature[key] = _decode_val
        if feature_config.get("context") is not None:
            for key in feature_config.get("context").keys():
                _shape = feature_config.get("context").get(key).get("shape")
                if _shape is None:
                    _val = _context_feature[key]
                    _def_val = feature_config.get("context").get(key).get("default_val")
                    _context_feature[key] = tf.sparse.to_dense(_val, default_value=_def_val)

        _feature = {**_context_feature, **_sequence_feature}

        return _feature

    def process(self, file_path, **kwargs):
        parse_function = tf.function(
            partial(self.parse_example,
                    context_description=self.context_description,
                    sequence_description=self.sequence_description,
                    feature_config=self.feature_config,
                    batch_size=self.batch_size)
        )

        tfrecord_file = self.local_path_decode(file_path=file_path)

        # dataset = tf.data.TFRecordDataset(
        #     filenames=tfrecord_file,
        #     compression_type=kwargs.get("compression_type"),
        #     buffer_size=kwargs.get("buffer_size"),
        #     num_parallel_reads=kwargs.get("num_parallel_reads", tf.data.AUTOTUNE)
        # )

        # dataset = tf.data.Dataset.list_files(tfrecord_file, shuffle=kwargs.get("is_train", False))
        dataset = tf.data.Dataset.from_tensor_slices(tfrecord_file)
        if kwargs.get("is_train", False):
            dataset = dataset.shuffle(
                buffer_size=kwargs.get("file_shuffle_size", 4),
                seed=kwargs.get("shuffle_seed"),
                reshuffle_each_iteration=kwargs.get("file_reshuffle_each_iteration", False)
            )

        # num_shards = kwargs.get("num_shards", 4)
        # def make_dataset(shard_index, dataset=dataset):
        #     dataset = dataset.shard(num_shards, shard_index)
        #     dataset = tf.data.TFRecordDataset(dataset, 
        #         compression_type=kwargs.get("compression_type"), 
        #         num_parallel_reads=kwargs.get("num_parallel_reads")
        #     )
        #     return dataset
        
        # indices = tf.data.Dataset.range(num_shards)
        # dataset = indices.interleave(make_dataset,
        #                             num_parallel_calls=tf.data.AUTOTUNE)
    
        dataset = dataset.interleave(
            lambda x: tf.data.TFRecordDataset(x, 
                compression_type=kwargs.get("compression_type"), 
                num_parallel_reads=kwargs.get("num_parallel_reads", tf.data.AUTOTUNE),
            ),
            cycle_length=kwargs.get("cycle_length"), 
            num_parallel_calls=kwargs.get("num_parallel_calls", tf.data.AUTOTUNE),
            deterministic=kwargs.get("interleave_deterministic", True)
        )

        dataset = dataset.batch(
            batch_size=kwargs.get("dummy_batch", self.batch_size),
            num_parallel_calls=kwargs.get("num_parallel_calls", tf.data.AUTOTUNE), 
            deterministic=kwargs.get("batch_deterministic", True)
        )

        dataset = dataset.map(
            parse_function, 
            num_parallel_calls=kwargs.get("num_parallel_calls", tf.data.AUTOTUNE),
            deterministic=kwargs.get("map_deterministic", True)
        )
        
        if kwargs.get("cache", False):
            dataset = dataset.cache(filename=kwargs.get("cache_file", ""))
        if kwargs.get("shuffle_size"):
            dataset = dataset.shuffle(
                buffer_size=kwargs.get("shuffle_size"),
                seed=kwargs.get("shuffle_seed"),
                reshuffle_each_iteration=kwargs.get("reshuffle_each_iteration", False)
            )
        # dataset = dataset.rebatch(self.batch_size)
        # dataset = dataset.unbatch()
        # dataset = dataset.batch(self.batch_size)
        dataset = dataset.prefetch(kwargs.get("prefetch_size", tf.data.AUTOTUNE))

        if kwargs.get("option", False):
            # 创建一个 tf.data.Options 对象,不用option会有一些问题
            # 比如deterministic 都为False会没有结果
            # num_parallel_calls都自动好像也会没结果
            # 不过使用了option,没法batch再rebatch
            options = tf.data.Options()
            # # 设置选项:启用并行处理,但确保确定性
            # options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
            options.threading.max_intra_op_parallelism = kwargs.get("max_intra_op_parallelism", 1)
            options.threading.private_threadpool_size = kwargs.get("private_threadpool_size", 4)
            options.deterministic = kwargs.get("deterministic", True)  # 关闭确定性以提高性能
            dataset = dataset.with_options(options)
        return dataset
    
    @staticmethod
    def local_path_decode(file_path):
        path_status = os.path.isdir(file_path)
        file_status = os.path.isfile(file_path)
        local_path = os.path.abspath(file_path)
        local_list = list()
        if not (file_status | path_status):
            raise ValueError(f"local_path: `{file_path}` is invalid.")
        if file_status:
            local_list.append(local_path)
        else:
            file_list = os.listdir(local_path)
            for items in file_list:
                if items.startswith('_') | items.startswith('~') | items.startswith('.'):
                    continue
                new_path = os.path.join(local_path, items)
                if os.path.isfile(new_path):
                    local_list.append(new_path)
        return local_list

import datetime
feature_config = time_predict_feature_config_from_data_meta(data_meta, past_length=93, future_length=62)
tp_t = TFRecordPipeLine(feature_config, batch_size=2**14)
train_dataset = tp_t.process(
    os.path.join(seq_l2, "tp_type=train"), 
    compression_type="GZIP", 
    num_parallel_reads=16, 
    # num_parallel_calls=32,
    # buffer_size=1024*1024*32,
    # shuffle_size=4,
    prefetch_size=3,
    deterministic=False,
    option=True,
    max_intra_op_parallelism=64,
    private_threadpool_size=32,
    is_train=True,
)

init_start = datetime.datetime.now()
start = datetime.datetime.now()
for i, batch in enumerate(train_dataset):
    end = datetime.datetime.now()
    # print(batch)
    print(end-start)
    start = end
    if i == 99:
        break
print("100 batch total use time:", end-init_start)

MY CPU's usage is just about 10%,each batch need almost 0.5s to 1s,which is too slow.

0:00:03.311197
0:00:00.155027
0:00:00.290817
0:00:00.860181
0:00:00.594197
0:00:00.523642
0:00:00.982845
0:00:01.098462
0:00:01.007237
0:00:01.214216
0:00:01.069441
0:00:00.555538
0:00:01.148678
0:00:01.417670
0:00:00.513149
0:00:00.769899
0:00:00.917635
0:00:01.288421
0:00:00.880603
0:00:01.094904
0:00:00.973094
0:00:00.787760
0:00:01.365806
0:00:02.136255
0:00:00.089710
...
0:00:00.364601
0:00:01.825834
0:00:00.003376
100 batch total use time: 0:02:20.718250

When I train my model which is a LSTM model, GPU is usage usually down to 0%.I think that time GPU is wait the data from CPU. enter image description here

I have tried to increase num_parallel_calls,max_intra_op_parallelism,private_threadpool_size,buffer_size and so on,but it all not useful.Why? Even I increase the param,my CPU's usage is still about 10%.I know that num_parallel_calls to high will case thread conflicts. So,what can I do?

0