BerandaComputers and TechnologyMembuat pipeline machine learning berskala besar dengan MinIO dan TensorFlow

Membuat pipeline machine learning berskala besar dengan MinIO dan TensorFlow

Kita hidup di era transformatif yang ditentukan oleh informasi dan AI. Sejumlah besar data dihasilkan dan dikumpulkan setiap hari untuk memberi makan algoritma AI / ML yang rakus dan canggih ini. Semakin banyak data, semakin baik hasilnya.

Salah satu kerangka kerja yang muncul sebagai standar industri utama adalah TensorFlow Google . Sangat serbaguna, seseorang dapat memulai dengan cepat dan menulis model sederhana dengan Kerangka kerja Keras . Jika Anda mencari pendekatan yang lebih canggih, TensorFlow juga memungkinkan Anda membuat model machine learning Anda sendiri menggunakan API tingkat rendah. Apa pun strategi yang Anda pilih, TensorFlow akan memastikan bahwa algoritme Anda dioptimalkan untuk infrastruktur apa pun yang Anda pilih untuk algoritme Anda – baik itu CPU , GPU atau TPU .

Karena set data menjadi terlalu besar untuk dimasukkan ke dalam memori atau disk lokal, pipeline AI / ML sekarang memiliki persyaratan untuk memuat data dari sumber data eksternal. Ambil contoh ImageNet set data dengan 14 Juta Gambar dengan perkiraan ukuran penyimpanan 1,31TB . Kumpulan data ini tidak dapat dimasukkan ke dalam memori atau di drive penyimpanan lokal mesin mana pun. Tantangan ini semakin rumit jika pipeline Anda berjalan di dalam lingkungan tanpa negara seperti Kubernetes (yang semakin menjadi norma).

Standar yang muncul untuk masalah ini adalah menggunakan penyimpanan objek berkinerja tinggi dalam desain pipeline AI / ML Anda. MinIO adalah pemimpin dalam bidang ini dan telah menerbitkan sejumlah tolok ukur yang berbicara tentang kemampuan throughputnya. Dalam postingan ini, kami akan membahas cara memanfaatkan MinIO untuk project TensorFlow Anda.

Saluran Data Skala Hiper Empat Tahap

Untuk membangun pipa skala hiper, kita akan memiliki masing-masing tahap pipa dibaca dari MinIO. Dalam contoh ini kita akan membuat empat tahap pipeline machine learning. Arsitektur ini akan memuat data yang diinginkan sesuai permintaan dari MinIO.

Pertama, kita akan memproses set data sebelumnya dan menyandikannya dalam format yang dapat dicerna dengan cepat oleh TensorFlow. Format ini adalah tf.TFRecord , yang merupakan jenis pengkodean biner untuk data kami. Kami mengambil langkah ini karena kami tidak ingin membuang waktu untuk memproses data selama pelatihan karena kami berencana untuk memuat setiap batch pelatihan langsung dari MinIO sesuai kebutuhan. Jika data diproses sebelumnya sebelum kami memasukkannya ke dalam pelatihan model, kami menghemat banyak waktu. Idealnya, kami membuat potongan data yang telah diproses sebelumnya yang mengelompokkan potongan data yang bagus – setidaknya 100-200MB dalam ukuran.

Untuk mempercepat tahap pemuatan data dan pelatihan, kami akan memanfaatkan sangat baik tf.data api. API ini dirancang untuk memuat data secara efisien selama pelatihan / validasi model kami. Ini mempersiapkan batch data berikutnya karena yang sekarang sedang diproses oleh model. Keuntungan dari pendekatan ini adalah memastikan pemanfaatan yang efisien dari GPU atau TPU mahal yang tidak dapat diam karena lambatnya pemuatan data. MinIO tidak mengalami masalah ini – dapat memenuhi jaringan 100Gbps dengan beberapa drive NVMe atau juga dengan Hard Disk Drive memastikan file pipeline memproses data secepat yang dimungkinkan oleh perangkat keras.

Selama pelatihan kami ingin memastikan bahwa kami menyimpan pos pemeriksaan pelatihan model kami serta histogram TensorBoard . Checkpoint berguna jika pelatihan terganggu dan kita ingin melanjutkan pelatihan atau jika kita mendapatkan lebih banyak data dan ingin terus melatih model kita dengan data baru dan histogram TensorBoard memungkinkan kita melihat bagaimana pelatihan berjalan saat itu terjadi. TensorFlow mendukung penulisan keduanya secara langsung ke MinIO.

Catatan sampingan cepat. Ketika model selesai, kami akan menyimpannya ke MinIO juga – memungkinkan kami untuk menyajikannya menggunakan Penyajian TensorFlow – tetapi itu adalah postingan untuk waktu lain.

Pipa End-to-End menggunakan MinIO

Membangun Pipa

Untuk pipeline skala-hiper kami, kami akan menggunakan kumpulan data yang dapat dengan mudah masuk ke komputer lokal Anda sehingga Anda dapat mengikutinya. The Set Data Ulasan Film Besar dari Stanford sangat bagus karena memiliki sampel dalam jumlah besar (25.000 untuk pelatihan dan 25.000 untuk pengujian) jadi kami akan membuat model analisis sentimen yang akan memberi tahu kami apakah ulasan film itu positif atau negatif. Ingatlah bahwa setiap langkah dapat diterapkan ke kumpulan data yang lebih besar. Keuntungan dari kumpulan data ini adalah Anda dapat mencoba di komputer Anda sendiri. Mari kita mulai!

Unduh kumpulan data dan unggah ke MinIO menggunakan Klien MinIO

  curl -O http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar .gz mc mb myminio / dataset mc cp aclImdb_v1.tar.gz myminio / dataset /  

Mari kita mulai dengan mendeklarasikan beberapa konfigurasi untuk pipeline kita, seperti ukuran batch , lokasi dataset kami dan tetap benih acak sehingga kami dapat menjalankan pipeline ini berulang kali dan mendapatkan hasil yang sama.

  random_seed=44 batch_size=128 dataset_bucket='dataset' preprocessed_data_folder='data-praproses' tf_record_file_size=500 # Atur benih acak tf.random.set_seed (random_seed)  # Cara mengakses MinIO minio_address='localhost: 9000' minio_access_key='minioadmin' minio_secret_key='minioadmin'  

Kita akan mengunduh kumpulan data kami dari MinIO menggunakan minio-py

minioClient=Minio (alamat_mini, access_key=minio_access_key, secret_key=minio_secret_key, aman=Salah) mencoba: minioClient.fget_object ( dataset_bucket, 'aclImdb_v1.tar.gz', '/tmp/dataset.tar.gz') kecuali ResponseError sebagai err: cetak (err)

Sekarang mari buka kompresi set data ke folder sementara ( / tmp / dataset ) untuk memproses data kami

ekstrak_folder=f '/ tmp / {dataset_bucket} /' dengan tarfile.open ("/ tmp / dataset.tar.gz", "r: gz") sebagai tar: tar.extractall (path=extract_folder)

Pra-Pemrosesan

Karena struktur dataset yang akan kita gunakan untuk membaca dari empat folder, awalnya tes dan melatih yang menahan 25.000 contoh masing-masing, lalu, di setiap folder yang kami miliki 12.500 dari setiap label pos untuk komentar positif dan neg untuk komentar negatif. Dari empat folder ini, kami akan menyimpan semua sampel menjadi dua variabel, melatih dan uji. Jika kami melakukan pra-pemrosesan kumpulan data yang tidak bisa muat di mesin lokal kami cukup memuat segmen objek, satu per satu dan memprosesnya juga.

  kereta=[] tes=[]  dirs_to_read=[    'aclImdb/train/pos',    'aclImdb/train/neg',    'aclImdb/test/pos',    'aclImdb/test/neg',]  untuk dir_name di dirs_to_read:     parts=dir_name.split ("/")     set data=bagian [1]     label=bagian [2]     untuk nama file di os.listdir (os.path.join (extract_folder, dir_name)):         dengan open (os.path.join (extract_folder, dir_name, filename), 'r') sebagai f:             konten=f.read ()             jika set data=="kereta":                 train.append ({                     "teks": konten,                     "label": label                 })             set data elif=="test":                 test.append ({                     "teks": konten,                     "label": label                 })  

Kami kemudian akan mengocok kumpulan data sehingga kami tidak memasukkan bias ke dalam pelatihan dengan memberikan 12.500 contoh positif berurutan diikuti dengan 12.500 contoh negatif berurutan. Model kami akan kesulitan menggeneralisasikannya. Dengan mengacak data, model akan melihat dan belajar dari contoh positif dan negatif pada saat yang bersamaan.

  random.Random (random_seed). shuffle (train) random.Random (random_seed) .shuffle (test)  

Karena kita berurusan dengan teks, kita perlu mengubah teks menjadi representasi vektor yang secara akurat menggambarkan makna kalimat. Jika kita berurusan dengan gambar, kita akan mengubah ukuran gambar dan mengubahnya menjadi representasi vektor yang setiap piksel menjadi nilai dari gambar yang diubah ukurannya.

Namun, untuk teks, kami memiliki tantangan yang lebih besar karena sebuah kata tidak memiliki representasi numerik. Di sinilah embeddings berguna. Embedding adalah representasi vektor dari beberapa teks, dalam hal ini kita akan merepresentasikan keseluruhan ulasan sebagai vektor tunggal dengan 512 dimensi. Alih-alih melakukan pra-pemrosesan teks secara manual (membuat token, membangun kosakata, dan melatih lapisan embeddings), kami akan memanfaatkan model yang ada yang disebut GUNAKAN (Universal Sentence Encoder) untuk menyandikan kalimat menjadi vektor sehingga kami dapat melanjutkan contoh kita. Ini adalah salah satu keajaiban deep learning, kemampuan untuk menggunakan kembali model yang berbeda bersama model Anda. Di sini kami menggunakan TensorFlow Hub dan kami akan memuat yang terbaru MENGGUNAKAN model.

impor tensorflow_hub sebagai penghubung embed=hub.load ("https://tfhub.dev/google/universal-sentence-encoder-large/5")

Karena terlalu banyak membuat embeddings dari 25.000 kalimat dan menyimpannya dalam memori, kita akan membagi dataset kita menjadi beberapa bagian 500 .

Untuk menyimpan data kami ke dalam TFRecord kita perlu menyandikan fitur sebagai tf.train .Fitur . Kami akan menyimpan label data kami sebagai daftar tf.int64 dan Review Film kami sebagai daftar float sejak setelah kami menyandikan kalimat menggunakan MENGGUNAKAN kami akan berakhir dengan embedding 512 dimensi

def _embedded_sentence_feature (nilai): return tf.train.Feature (float_list=tf.train.FloatList (nilai=nilai)) def _label_feature (nilai): return tf.train.Feature (int64_list=tf.train.Int64List (nilai=nilai)) def encode_label (label): jika label=="pos": return tf.constant ([1,0]) elif label=="neg": return tf.constant ([0,1]) # Ini akan mengambil label dan kalimat yang disematkan dan menyandikannya sebagai tf.TFRecord def serialize_example (label, kalimat_tensor): fitur={ 'kalimat': _embedded_sentence_feature (kalimat_tensor [0]), 'label': _label_feature (label), } example_proto=tf.train.Example (fitur=tf.train.Features (fitur=fitur)) kembalikan example_proto def process_examples (catatan, prefix=""): starttime=timeit.default_timer () total_training=len (catatan) cetak (f "Total dari {total_training} elemen") total_batches=math.floor (pelatihan_ total / tf_record_file_size) jika total_training% tf_record_file_size!=0: total_batches +=1 cetak (f "Total {total_batches} file dari {tf_record_file_size} record") counter=0 file_counter=0 penyangga=[] file_list=[] untuk saya dalam jangkauan (len (catatan)): counter +=1 kalimat_embedding=embed ([records[i] ['text']]) label_encoded=encode_label (catatan [i] ['label']) record=serialize_example (label_encoded, kalimat_embedding) buffer.append (catatan) jika penghitung>=tf_record_file_size: print (f "Records in buffer {len (buffer)}") # simpan contoh buffer ini sebagai file ke MinIO counter=0 file_counter +=1 file_name=f "{prefix} _file {file_counter} .tfrecord" dengan open (nama_file, 'w +') sebagai f: dengan tf.io.TFRecordWriter (f.name, options="GZIP") sebagai penulis: misalnya di buffer: writer.write (contoh.SerializeToString ()) mencoba: minioClient.fput_object (dataset_bucket, f "{preprocessed_data_folder} / {file_name}", nama_file) kecuali ResponseError sebagai err: cetak (err) file_list.append (nama_file) os.remove (nama_file) penyangga=[] cetak (f "Selesai dengan potongan {file_counter} / {total_batches} - {timeit.default_timer () - starttime}") jika len (buffer)> 0: file_counter +=1 file_name=f "file {file_counter} .tfrecord" dengan open (nama_file, 'w +') sebagai f: dengan tf.io.TFRecordWriter (f.name) sebagai penulis: misalnya di buffer: writer.write (contoh.SerializeToString ()) mencoba: minioClient.fput_object (dataset_bucket, f "{preprocessed_data_folder} / {file_name}", nama_file) kecuali ResponseError sebagai err: cetak (err) file_list.append (nama_file) os.remove (nama_file) penyangga=[] print ("Total waktu preprocessing adalah:", timeit.default_timer () - starttime) kembali file_list process_examples (kereta, prefix="train") process_examples (tes, prefix="test") print ("Selesai memproses data!")

Pada titik ini kami selesai memproses data kami. Kami memiliki satu set . file tfrecord yang disimpan di ember. Sekarang kita akan memasukkannya ke model yang memungkinkannya mengonsumsi dan melatihnya secara bersamaan.

Latihan

Kami akan mendapatkan daftar file (data pelatihan) dari MinIO. Secara teknis, tahap pra-pemrosesan dan tahap pelatihan dapat sepenuhnya dipisahkan, jadi sebaiknya daftar potongan file yang kita miliki di bucket.

  # Cantumkan semua file tfrecord pelatihan object=minioClient.list_objects_v2 (dataset_bucket, prefix=f "{preprocessed_data_folder} / train") training_files_list=[] untuk obj di objek:     training_files_list.append (obj.object_name) # Cantumkan semua file tfrecord pengujian objek=minioClient.list_objects_v2 (dataset_bucket, prefix=f "{preprocessed_data_folder} / test") testing_files_list=[] untuk obj di objek:     testing_files_list.append (obj.object_name)  

Untuk mendapatkan TensorFlow terhubung ke MinIO we akan memberitahunya lokasi dan detail koneksi dari instance MinIO kami.

  os.environ ['AWS_ACCESS_KEY_ID']=minio_access_key os.environ ['AWS_SECRET_ACCESS_KEY']=minio_secret_key os.environ ['AWS_REGION']="us-east-1" os.environ ['S3_ENDPOINT']=minio_address os.environ ['S3_USE_HTTPS']="0" os.environ ['S3_VERIFY_SSL']="0"  

Sekarang mari kita buat tf.data.Dataset yang memuat catatan dari file kami di MinIO saat mereka dibutuhkan. Untuk melakukan itu kita akan mengambil daftar file yang kita miliki dan memformatnya dengan cara yang merujuk pada lokasi objek sebenarnya. Kami akan melakukan ini juga untuk pengujian dataset.

all_training_filenames=[f"s3://datasets/{f}" for f in training_files_list] testing_filenames=[f"s3://datasets/{f}" for f in testing_files_list]

Langkah berikut ini opsional, tetapi saya merekomendasikannya. Saya akan membagi dataset pelatihan saya menjadi dua set, 90% dari data untuk pelatihan dan 10% data untuk validasi, model tidak akan mempelajari data validasi tetapi akan membantu model melatih lebih baik.

  total_train_data_files=math.floor (len (all_training_filenames) 0,9) jika total_train_data_files==len (all_training_filenames):     total_train_data_files -=1 training_files=all_training_filenames [0:total_train_data_files] validation_files=all_training_filenames [total_train_data_files:]  

Sekarang mari kita buat tf.data set data:

  AUTO=tf.data.experimental.AUTOTUNE ignore_order=tf.data.Options () ignore_order.experimental_deterministic=False  dataset=tf.data.TFRecordDataset (training_files, num_parallel_reads=AUTO, compression_type="GZIP") dataset=dataset.with_options (abaikan_order)  validation=tf.data.TFRecordDataset (validation_files, num_parallel_reads=AUTO, compression_type="GZIP") validation=validation.with_options (ignore_order)  testing_dataset=tf.data.TFRecordDataset (testing_filenames, num_parallel_reads=AUTO, compression_type="GZIP") testing_dataset=testing_dataset.with_options (ignore_order)  

Untuk memecahkan kode kami TFRecord file yang disandikan kami akan membutuhkan fungsi decoding yang melakukan kebalikan dari kami contoh_serialisasi fungsi. Sejak data keluar dari TFRecord memiliki bentuk (512,) dan (2,) masing-masing, kami akan membentuknya kembali karena itulah formatnya model kami akan mengharapkan untuk menerima.

def decode_fn (record_bytes): schema={ "label": tf.io.FixedLenFeature ([2], dtype=tf.int64), "kalimat": tf.io.FixedLenFeature ([512], dtype=tf.float32), } tf_example=tf.io.parse_single_example (record_bytes, schema) new_shape=tf.reshape (tf_example ['sentence'], [1,512]) label=tf.reshape (tf_example ['label'], [1,2]) kembalikan bentuk_baru, label

Ayo membangun model kami, tidak ada yang mewah, saya hanya akan menggunakan beberapa lapisan Padat dengan softmax aktivasi di akhir. Kami mencoba untuk memprediksi apakah inputnya positif atau negatif jadi kita akan mendapatkan probabilitas kemungkinannya setiap.

model=keras. Sekuensial() model.add ( keras.layers.Dense ( unit=256, bentuk_input=(1.512), aktivasi='relu' ) ) model.add ( keras.layers.Dropout (rate=0,5) ) model.add ( keras.layers.Dense ( unit=16, aktivasi='relu' ) ) model.add ( keras.layers.Dropout (rate=0,5) ) model.add (keras.layers.Dense (2, activation='softmax')) model.compile ( loss='kategorikal_kategori', optimizer=keras.optimizers.Adam (0,001), metrik=['accuracy'] )

Struktur dari model Deep Learning kami

Mari persiapkan kumpulan data kita untuk tahap pelatihan dengan meminta mereka mengulang sedikit dan batch 128 item pada satu waktu

mapped_ds=dataset.map (decode_fn) mapped_ds=mapped_ds. ulangi (5) mapped_ds=mapped_ds.batch (128) mapped_validation=validation.map (decode_fn) mapped_validation=mapped_validation.repeat (5) mapped_validation=mapped_validation.batch (128) testing_mapped_ds=testing_dataset.map (decode_fn) testing_mapped_ds=testing_mapped_ds. ulangi (5) testing_mapped_ds=testing_mapped_ds.batch (128)

Saat kami berlatih, kami ingin menyimpan pos pemeriksaan model kami jika pelatihan terganggu dan kami ingin melanjutkan dari tempat kami tinggalkan. Untuk melakukan ini, kita akan menggunakan panggilan balik keras tf.keras.callbacks.ModelCheckpoint

agar TensorFlow menyimpan pos pemeriksaan ke MinIO setelah setiap periode. checkpoint_path=f "s3: // {dataset_bucket} /checkpoints/cp.ckpt" cp_callback=tf.keras.callbacks.ModelCheckpoint (filepath=checkpoint_path, save_weights_only=Benar, verbose=1)

Kami juga ingin menyimpan TensorBoard histogram jadi kami akan melakukannya tambahkan callback untuk menyimpannya di bucket kami di bawah log / imdb / awalan. Kami mengidentifikasi proses ini dengan model_note dan arus kali, ini agar kami dapat membedakan berbagai contoh pelatihan.

model_note="256-masukan" logdir=f "s3: // {dataset_bucket} / logs / imdb / {model_note} -" + datetime.now (). strftime ("% Y% m% d-% H% M% S") tensorboard_callback=tf.keras.callbacks.TensorBoard (log_dir=logdir)

Akhirnya kami akan melatih model:

  history=model.fit (     mapped_ds,     zaman=10,     panggilan balik=[cp_callback, tensorboard_callback],     validation_data=mapped_validation )  

Jika kami menjalankan mc admin melacak myminio kita bisa melihat TensorFlow membaca data langsung dari MinIO, tetapi hanya bagian yang dibutuhkan:

TensorFlow membaca Konten Parsial dari file kami yang berisi data

Sekarang setelah memiliki model, kami ingin menyimpannya ke MinIO:

  model.save (f "s3: // {dataset_bucket} / imdb_sentiment_analysis")  

Mari kita uji model kita dan lihat caranya itu melakukan:

pengujian=model. evaluasi (testing_mapped_ds)

Ini kembali Akurasi 85,63%, bukan yang canggih, tetapi juga tidak buruk untuk contoh sederhana seperti itu.

Ayo lari TensorBoard untuk mengeksplorasi model kami memuat data langsung dari MinIO

AWS_ACCESS_KEY_ID=minioadmin AWS_SECRET_ACCESS_KEY=minioadmin AWS_REGION=us-east-1 S3_ENDPOINT=localhost: 9000 S3_USE_HTTPS=0 S3_VERIFY_SSL=0 tensorboard --logdir s3: // dataset / logs

Lalu pergi ke http: // localhost: 6006 di browser Anda

Kita bisa bermain dengan model kita dan melihat jika saya karya t

sampel=[ "This movie sucks", "This was extremely good, I loved it.", "great acting", "terrible acting", "pure kahoot", "This is not a good movie",] sample_embedded=embed (sampel) res=model.predict (sample_embedded) untuk s dalam kisaran (len (sampel)): jika res [s] [0]> res [s] [1]: cetak (f "{sampel [s]} - positif") lain: cetak (f "{sampel [s]} - negatif")

Ini mengembalikan output berikut

  Film ini menyebalkan - negatif Ini sangat bagus, saya menyukainya. - positif akting yang bagus - positif akting yang mengerikan - negatif kahoot murni - positif Ini bukan film yang bagus - negatif  

Kesimpulan

Seperti yang ditunjukkan, Anda dapat membangun AI / ML skala besar jaringan pipa yang sepenuhnya dapat mengandalkan MinIO. Ini adalah fungsi dari karakteristik kinerja MinIO, tetapi juga kemampuannya untuk menskalakan data Petabytes dan Exabytes dengan mulus. Dengan memisahkan penyimpanan dan komputasi, seseorang dapat membangun kerangka kerja yang tidak bergantung pada sumber daya lokal - memungkinkan Anda untuk menjalankannya pada wadah di dalam Kubernetes. Ini menambah fleksibilitas yang cukup.

Anda dapat melihat bagaimana TensorFlow dapat memuat data seperti yang diperlukan dan tidak perlu penyesuaian sama sekali, itu hanya berfungsi . Selain itu, pendekatan ini dapat dengan cepat diperluas ke pelatihan dengan berlari TensorFlow secara terdistribusi . Ini memastikan bahwa hanya ada sedikit data untuk diacak melalui jaringan antara node pelatihan karena MinIO menjadi satu-satunya sumber data tersebut.

Kode untuk posting ini adalah tersedia di Github di: https://github.com/dvaldivia/hyper-scale-tensorflow-with -minio

Read More

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments