Bài 22 - Cơ chế truyền dữ liệu giữa các goroutine


  • Trùm cuối

    Việc trao đổi dữ liệu giữa các goroutine trong xử lý đồng thời là rất thường xuyên. Giải quyết vấn đề đó bằng cách sử dụng các biến chung rất hay được áp dụng nhưng đi kèm với nó là xử lý xung đột như đã thấy ở bài trước. Go cung cấp một cơ chế trao đổi khác giúp cho việc trao đổi dữ liệu giữa các goroutine mượt mà hơn. Đó là kênh (channel).

    4391e026-1203-421b-ae1f-e73f58e1701d-image.png

    Mỗi một kênh được tạo ra phục vụ việc trao đổi dữ liệu của một kiểu dữ liệu nhất định. Chúng ta khai báo một kênh bằng hàm make. Ví dụ để tạo một kênh trao đổi số nguyên, chúng ta có thể khai báo như sau: ch := make (chan int). Biến ch được tạo ra thuộc kiểu dữ liệu tham chiếu nên khi sao chép nó hay dùng nó làm tham số thì thật sự nó tham chiếu đến cùng một cấu trúc dữ liệu lưu bên dưới. Do là kiểu tham chiếu nên giá trị mặc định là nil. Hai biến kiểu này có thể so sánh bằng và chúng bằng nhau khi cùng tham chiếu đến một cấu trúc dữ liệu lưu bên dưới.

    Hai hành động đi kèm với kênh là gửinhận. Hành động gửi được xem là thao tác ghi dữ liệu vào kênh của một goroutine và goroutine khác đọc dữ liệu từ kênh này được xem là hành động nhận. Sau hành động nhận, dữ liệu sẽ được giải phóng khỏi kênh. Cả hai hành động này đều được khai báo bởi toán tử <-

    • Hành động gửi thì toán tử <- nằm giữa biến kênh và dữ liệu cần gửi. Ví dụ: ch <- x với x là biến chứa giá trị cần gửi.
    • Hành động nhận thì toán tử <- nằm trước biến kênh. Ví dụ: y = <- ch. Trong trường hợp muốn giải phóng dữ liệu khỏi kênh mà không cần lấy giá trị của nó, chúng ta có thể dùng: <- ch .

    Trong trường hợp muốn khai báo kênh chỉ gửi hoặc chỉ nhận, chúng ta khai báo như sau: ch := make (chan<- int) cho kênh chỉ gửi và ch := make (<-chan int) cho kênh chỉ nhận.

    Một hành động khác cho kênh được Go cung cấp là đóng kênh. Lúc này việc gửi vào kênh này sẽ bị báo lỗi panic. Việc nhận ở kênh đã đóng vẫn được diễn ra cho đến khi hết dữ liệu lưu ở kênh. Lúc này nếu tiếp tục đọc sẽ nhận giá trị zero mặc định tùy thuộc kiểu dữ liệu kênh. Để đóng kênh, chúng ta dùng hàm close: close (ch).

    Go cung cấp hai loại kênh khác nhau: kênh đơn (unbuffered channel) và kênh đa (buffered channel). Loại kênh ch như khai báo ở trên là kênh đơn. Kênh đa cũng được khai báo tương tự như có thêm số giá trị dữ liệu có thể chứa. Chúng ta cùng tìm hiểu hai loại kênh này ngay sau đây.

    Kênh đơn là kênh chỉ chứa tối đa một giá trị dữ liệu. Khi thực hiện một hành động gửi dữ liệu vào kênh đơn, goroutine này sẽ bị khóa cho đến khi có goroutine khác nhận dữ liệu thì goroutine này mới tiếp tục được thực thi. Ngược lại nếu một goroutine nhận dữ liệu từ một kênh nhưng kênh chưa có dữ liệu sẵn thì nó sẽ bị khóa cho đến khi có dữ liệu được gửi vào kênh. Trao đổi dữ liệu ở kênh đơn buộc các goroutine gửi và nhận dữ liệu phải được đồng bộ nên đôi lúc kênh đơn còn gọi là kênh đồng bộ (synchronous channel).

    a6987692-1f47-4a58-8fbc-c5a661b4cd94-image.png

    • Bước 1: cả 2 goroutine (GR) gửi và nhận đều ở trạng thái chuẩn bị gửi/nhận.
    • Bước 2: GR gửi bắt đầu gửi dữ liệu vào kênh, lúc này nó sẽ bị khóa hoạt động.
    • Bước 3, GR nhận bắt đầu nhận dữ liệu.
    • Bước 4 và 5 mô tả quá trình nhận.
    • Bước 6 việc nhận hoàn tất, GR gửi sẽ được mở khóa để thực thi tiếp.

    Chúng ta cùng tìm hiểu cách sử dụng kênh đơn qua ví dụ mô phỏng một ván đấu quần vợt. Một ván đấu bắt đầu khi một người chơi giao bóng, người còn lại chờ bóng qua rồi đánh trả về lại. Một người chơi luôn ở một trong hai trạng thái: hoặc đánh bóng qua phía đối thủ hoặc chờ nhận bóng do đối thủ đánh sang:

     package main 
      
     import ( 
         "fmt" 
         "math/rand" 
         "sync" 
         "time" 
     ) 
      
     // wg được sử dụng để đợi các goroutine kết thúc 
     var wg sync.WaitGroup 
      
     // Hàm này chạy trước khi các lệnh hàm main được thực thi 
     func init() { 
         rand.Seed(time.Now().UnixNano()) // Tạo nhân gieo giá trị ngẫu nhiên 
     } 
      
     func main() { 
         court := make(chan int) 
      
         wg.Add(2) 
      
         // Tạo 2 người chơi 
         go player("Federer", court) 
         go player("Djokovic", court) 
      
         // Bắt đầu phát bóng cho một ván đấu 
         court <- 1 
      
         wg.Wait()   // Đợi ván đấu kết thúc 
         fmt.Println("Ván đấu kết thúc!") 
     } 
      
     // Hàm mô tả một người chơi quần vợt 
     func player(name string, court chan int) { 
         // Thông báo người này chơi xong ván đấu 
         defer wg.Done() 
      
         for { 
             // Đợi banh từ đối thủ 
             ball, ok := <-court 
             if !ok { 
                // Thắng nếu kênh đã đóng 
                 fmt.Printf("%s thắng!\n", name) 
                 return 
             } 
      
             // Lấy một giá trị ngẫu nhiên 0 - 99 
             n := rand.Intn(100) 
             if n%13 == 0 { 
                 fmt.Printf("%s đánh hỏng ở lượt đánh thứ %d!\n", name, ball) 
      
                 // Đóng kênh khi đánh hỏng 
                 close(court) 
                 return 
             } 
      
             fmt.Printf("Lượt đánh bóng thành công thứ %d: %s\n", ball, name) 
             ball++ 
      
             // Đánh banh về lại đối thủ 
             court <- ball 
         } 
     } 
    
    • sync.WaitGroup là cơ chế giúp chương trình trình ngưng lại chờ cho đến khi đủ điều kiện. Chúng ta cần đến nó để đảm bảo chương trình kết thúc khi và chỉ khi 2 goroutine kết thúc trước. Ở ví dụ bài trước chúng ta sử dụng chờ nhập từ bàn phím nhưng tôi thấy cách đó bất tiện trong ví dụ này. Cách thức sử dụng như sau:
      💩 Khai báo biến kiểu WaitGroup như ở dòng 11.
      💩 Tạo điều kiện chờ bằng hàm Add(<số điều kiện chờ>) như ở dòng 21. Ở đây mình có 2 goroutine nên tham số là 2.
      💩 Gọi hàm Wait() như ở dòng 30. Lúc này chương trình gọi nó sẽ bị khóa cho đến khi đủ điều kiện mở khóa chờ.
      💩 Tại nơi cần xác định thực thi xong, gọi hàm Done() để thông báo một điều kiện được thỏa như dòng 37. Cách gọi defer là để thông báo hàm wg.Done() sẽ được gọi khi kết thúc hàm này, cũng là kết thúc goroutine tương ứng.
      💩 Như vậy khi cả hai goroutine thực thi xong thì wg thỏa điều kiện chờ nên dòng 31 được thực thi.
    • Hàm init() là hàm thường dùng để khai báo các thông số ban đầu. Hàm này được thực thi trước khi khối lệnh hàm main được thực thi. Ở dòng 15, chúng ta tạo nhân cho bộ tạo số ngẫu nhiên dựa trên tham số là thời điểm gọi. Cách làm này giúp cho mỗi lần chạy chương trình, các số ngẫu nhiên sẽ được tạo mới hoàn toàn đảm bảo cho việc lấy số ngẫu nhiên thật sự ngẫu nhiên, không lặp lại.
    • Đầu tiên dữ liệu được truyền vào kênh như ở dòng 28. Đây được xem là hành động giao bóng. Dòng 41 thể hiện việc nhận bóng từ đối thủ. Ở dòng này có biến ok. Biến này cho giá trị false nếu kênh bị đóng, tương ứng khi đối thủ đánh hỏng/lỗi. Trường hợp này thì người chơi sẽ thắng. Trong trường hợp nhận được bóng, người chơi sẽ đánh lại về phía đối thủ như ở dòng 62.
    • Cơ chế xác định đánh hỏng/lỗi là lấy giá trị ngẫu nhiên, nó chia hết cho 13 thì người chơi này đánh hỏng như ở dòng 49-56. Nếu đánh hỏng, kênh sẽ được đóng như ở dòng 54 đồng thời kết thúc hàm cũng là kết thúc goroutine, kết thúc ván đấu của người chơi này. Người chơi còn lại phải chờ khi nhận được thông tin kênh đã đóng thì sẽ kết thúc luôn ván đấu của mình bằng cách kết thúc hàm, kết thúc goroutine bằng lệnh return ở dòng 45.

    a686ed22-df6e-42b8-90b0-535f85bb69cc-image.png

    Nếu hứng thú bạn có thể tiếp tục phát triển ví dụ trên để tạo thành một trận đánh tennis hoàn chỉnh.

    Kênh đa nghĩa là tại một thời điểm kênh có thể chứa nhiều hơn một giá trị dữ liệu. Số giá trị dữ liệu mà nó có thể chứa được khai báo khi tạo kênh. Cách tạo kênh đa cũng tương tự như tạo kênh đơn với hàm make, có thêm tham số xác định số giá trị dữ liệu tối đa có thể chứa. Cụ thể để tạo một kênh đa chứa 5 giá trị kiểu chuỗi, ta khai báo như sau: ch := make(chan string, 5).

    Hoạt động gửi dữ liệu sẽ thêm dữ liệu vào cuối danh sách dữ liệu lưu trong kênh. Ngược lại hoạt động nhận sẽ lấy dữ liệu từ đầu danh sách dữ liệu trong kênh. Hàm len(ch) trả về số giá trị dữ liệu đang có trong kênh và hàm cap(ch) cho biết số giá trị dữ liệu tối đa mà kênh có thể chứa.

    Điều kiện khóa goroutine trên kênh đa cũng thay đổi. Goroutine nhận dữ liệu chỉ bị khóa nếu không còn dữ liệu trong kênh để nhận. Ngược lại, goroutine gửi dữ liệu chỉ bị khóa khi khả năng chứa của kênh đã hết. Điều này khiến cho 2 goroutine gửi và nhận không nhất thiết phải đồng bộ như ở kênh đơn.

    4efd94fa-3dc7-4756-abc9-177639b1b2f8-image.png

    • Bước 1: Goroutine (GR) gửi bên trái chuẩn bị gửi vào kênh đa, trong khi GR nhận bên phải bắt đầu quá trình nhận dữ liệu. Kênh đa lúc này có sẵn 5 giá trị dữ liệu.
    • Bước 2: GR gửi bắt đầu quá trình gửi dữ liệu trong khi GR nhận đã nhận xong một giá trị dữ liệu.
    • Bước 3: GR bên trái đang gửi thêm một dữ liệu, trong khi GR bên phải đang nhận thêm một dữ liệu. 2 goroutine chạy song song mà không hề bị khóa.
    • Bước 4: Cả 2 GR trái và phải đều hoàn tất quá trình gửi và nhận dữ liệu. Trong kênh đa hiện vẫn còn 5 giá trị dữ liệu.

    Bây giờ chúng ta cùng khảo sát ví dụ về kênh đa. Giả sử chúng ta có 10 công việc cần hoàn thành bởi 4 người, chúng ta sẽ mô phỏng quá trình thực hiện 10 công việc này dưới dạng 4 goroutine nhận 10 giá trị từ kênh đa như sau:

     package main 
      
     import ( 
         "fmt" 
         "math/rand" 
         "sync" 
         "time" 
     ) 
      
     const ( 
         numWorkers = 4 
         numTasks = 10 
     ) 
      
     var wg sync.WaitGroup 
      
     func init() { 
         rand.Seed(time.Now().Unix()) 
     } 
      
     func main() { 
         // Tạo kênh đa chứa các công việc 
         tasks := make(chan string, numTasks) 
      
         wg.Add(numWorkers) 
         for gr := 1; gr <= numWorkers; gr++ { 
             go worker(tasks, gr) 
         } 
      
         // Tạo chuỗi các công việc cần thực thi cho vào kênh 
         for t := 1; t <= numTasks; t++ { 
             tasks <- fmt.Sprintf("công việc thứ %d.", t) 
         } 
      
         close(tasks) 
      
         wg.Wait() 
     } 
      
     // Thực hiện tác vụ, thực nhất là tạo và xử lý một goroutine 
     func worker(tasks chan string, worker int) { 
         defer wg.Done() 
      
         for { 
             // Chờ nhận công việc 
             task, ok := <-tasks 
             if !ok { 
                 // Hết việc 
                fmt.Printf("Người thứ %d: Hoàn thành các công việc được giao!\n", worker) 
                 return 
             } 
      
             fmt.Printf("Người thứ %d: Bắt đầu thực hiện %s\n", worker, task) 
      
             // Giả lập thời gian thực hiện công việc 
             sleep := rand.Int63n(100) 
             time.Sleep(time.Duration(sleep) * time.Millisecond) 
      
             fmt.Printf("Người thứ %d: Thực hiện xong %s\n", worker, task) 
         } 
     } 
    
    • Dòng 10-12 khai báo hằng số là số người và số công việc cần thực hiện.
    • Dòng 21-38: hàm main thực hiện tạo ra các goroutine đóng vai trò thực thi các công việc là các giá trị lưu trong kênh đa:
      💩 Dòng 23 tạo kênh đa tasks chứa tối đa là số công việc thực hiện: 10.
      💩 Dòng 26-28 tạo ra 4 goroutine đóng vai trò 4 người thực hiện các công việc nhận qua kênh đa tasks.
      💩 Dòng 31-33 tạo ra 10 công việc thể hiện 10 chuỗi định danh công việc gửi vào kênh đa tasks.
      💩 Dòng 35 đóng kênh để đảm bảo khi hết việc tức hết dữ liệu ở kênh, các goroutine sẽ được báo là hoàn thành công việc rồi.
    • Dòng 41-61: hàm đóng vai trò thực thi công việc thực chất là lặp quy trình như thể hiện ở dòng 44-60:
      💩 Nhận việc qua việc nhận giá trị từ kênh đa như dòng 47
      💩 Dòng 47 sẽ cho biết hết việc nếu ok cho giá trị false. Có được điều này là nhờ dòng 35 đã đóng kênh tasks, nên các goroutine khi đọc hết dữ liệu sẽ nhận giá trị false cho biến ok. Khi hết việc vòng lặp kết thúc, công việc hoàn thành.
      💩 Khi ok là true, nhận việc thành công thì bắt đầu thực hiện như thông báo ở dòng 53.
      💩 Thời gian thực hiện công việc vừa nhận được tạo ngẫu nhiên như dòng 56 và dòng 57 thể hiện khoảng thời gian thực thi.
      💩 Dòng 59 thông báo đã thực hiện xong và quay lại chu trình nhận việc mới.

    cb1ed706-00f8-46b4-a659-c4c39f4dea25-image.png

    Việc sử dụng kênh đơn hay kênh đa là chúng ta tự cân nhắc. Một điểm lưu ý là dùng kênh đơn hay đa thì cũng sẽ có lúc trạng thái các goroutine bị khóa. Nếu không có giải pháp xử lý sẽ khiến goroutine bị treo. Tình huống này Go không phát hiện và giúp được cho chúng ta nên cần cẩn thận.

    Một lưu ý khác là kênh chỉ dùng cho trao đổi dữ liệu giữa các goroutine. Việc khai báo kênh, thực hiện gửi nhận trên cùng một groutine sẽ dẫn đến hiện tượng treo goroutine (deadlock).

    Bài tiếp theo là test chương trình Go.

    Tóm tắt

    • Kênh là cơ chế trao đổi dữ liệu giữa các goroutine. Mỗi kênh chỉ trao đổi 1 loại dữ liệu.
    • Biến kênh là kiểu tham chiếu, được tạo bằng lệnh make. Hai hành động chính của kênh là gửi và nhận. Một hành động khác là đóng kênh.
    • Có 2 loại kênh: kênh đơn (unbuffered channel) và kênh đa (buffered channel).
    • Kênh đơn chỉ chứa tối đa một giá trị dữ liệu trong kênh. Kênh đơn đòi hỏi đồng bộ trong gửi nhận: khi gửi thì goroutine thực hiện gửi bị khóa cho đến khi việc nhận kết thúc và goroutine thực hiện nhận bị khóa cho đến khi nhận xong dữ liệu từ kênh.
    • Kênh đa có thể chứa nhiều hơn một giá trị dữ liệu và chứa tối đa số giá trị được khai báo. Goroutine gửi chỉ bị khóa khi khả năng chứa của kênh bị đầy và goroutine nhận chỉ bị khóa khi kênh trống dữ liệu.
    • Sử dụng kênh đơn hay đa đều cẩn thận để tránh bị khóa.


Có thể bạn cũng quan tâm

.
DMCA.com Protection Status