運用 Go 語言做 EMR 串流

本文翻譯自 NY Times,原作者為 JP Robinson:https://open.blogs.nytimes.com/2014/07/10/emr-streaming-in-go/?_php=true&_type=blogs&_php=true&_type=blogs&_r=1

我們的平台團隊使用 Amazon 的 Elastic MapReduce (EMR) 服務,協助我們從日誌檔收集有用的數據。我們有一些程序會抓取日誌檔,接著壓縮並推送到 Amazon S3 上儲存。感謝 EMR,這套模式聚集出回溯數年之久的龐大資訊,等待我們進行資料重整 (data crunching)。一開始我們用 Python 做了不少苦功,不過我們漸漸地轉而依賴 Go。


當我們一開始使用 EMR,我的團隊用 Python 來寫 mapper 跟 reducer 腳本。之所以選擇 Python,是因為只要最簡單的設定與程式碼,就可以寫出一支能夠從標準輸入讀取 JSON 或 CSV 記錄,並向標準輸出寫出類似結構化資料的 Python 腳本。還有,既然我們已經決定使用 boto Python 程式庫來啟動 EMR 工作流,並管理其輸出,在整個專案中使用同一種語言是較為合理的。

Python 對於簡單的處理做得很好。有一段時間的性能表現相當不錯,既然該腳本十分簡單,我們甚至不需要用到第三方模組。最後,我們遇到的情況需要引入一個內部模組,來重用部分商業邏輯,或是藉由第三方模組來挖掘資料的意義,因此在這個 EMR 工作流增加了新一層的複雜度:bootstrap 動作。

一旦準備完成之後,Bootstrap 動作會在叢集的每一個節點上執行,對於簡單的第三方程式庫,bootstrap 動作可以執行類似 sudo apt-get install python-nltk 的命令。要載入紐約時報內部的程式庫,我們必須把它放在 S3 上,然後寫一支安裝腳本當作 bootstrap 動作。兩種情況都很繁瑣且耗時,我們希望有更優雅的解決方案。既然我們越來越常用 Go 來寫程式庫和服務,我們就想它是不是適用於這個情況。

改用 Go

我的團隊花了一年時間,用 Go 建構後端服務與 web APIs。正如 Dave Cheney 最近在 Gocon 發表的演說所指出,由於 Go 提供的並行性、易於部署、優秀效能,它通常是一個絕佳的選擇。此外,它的語法清晰,有動態語言的感覺,但又仍屬靜態型別。

當我們在線上有了若干 Go 的服務,我們注意到,在串流 mapper 之中重用這些服務中的部分邏輯,看來是可行的。結果用 Go 來寫 mapper (還有 reducer) 和 Python 比起來,並不會花太多力氣。為了示範,我會並列出用 Python 和 Go 所寫的簡單 mapper/reducer 當作例子。

Python Mapper


import sys
import simplejson as json

def main():

    # loop through each line of stdin
    for line in sys.stdin:

        # parse the incoming json 
        j = json.loads(line.strip())

        # initialize output structure
        output = dict()

        # grab an identifier
        output["key"] = j["data"]["key"]

        # and any other useful information from input json
        output["secondary-key"] = j["data"]["another-key"]
        output["first-metric"] = j["data"]["metric"]
        output["second-metric"] = j["data"]["metric-2"]

    except Exception as e:
        sys.stderr.write("unable to read log: %s" % e)


        # generate json output
        output_json = json.dumps(output)

        # write the key and json to stdout    
        print "%s\t%s" % (output["key"], output_json) 

    except Exception as e:
        sys.stderr.write("unable to write mapper output: %s" % e)

if __name__ == "__main__":

Go mapper

package main

import (

func main() {
     var line []byte
     var input logRecord
     var output mapperOutput
     var outputJSON []byte
     var err error

     // loop through each line of stdin
     ls := bufio.NewScanner(os.Stdin)
     for ls.Scan() {
      line = ls.Bytes()

      // parse the incoming json
      if err = json.Unmarshal(line, &input); err != nil {
           log.Print("unable to read log: ", err)

      // initialize output structure
      output = mapperOutput{
           // grab an identifier
           // and any other useful information from input json

      // generate json output
      if outputJSON, err = json.Marshal(output); err != nil {
           log.Print("unable to write mapper output: ", err)

      // write the key and json to stdout
      fmt.Fprintf(os.Stdout, "%s\t%s\n", output.Key, outputJSON)

     if ls.Err() != nil {
      log.Print("error reading from stdin: ", ls.Err())

type logRecord struct {
     Data struct {
      Key           string `json:"key"`
      AnotherKey    string `json:"another-key"`
      Metric        int64  `json:"metric"`
      AnotherMetric int64  `json:"metric-2"`
     } `json:"data"`

type mapperOutput struct {
     Key          string `json:"key"`
     SecondaryKey string `json:"secondary-key"`
     FirstMetric  int64  `json:"first-metric"`
     SecondMetric int64  `json:"second-metric"`

Python reducer


import sys
import simplejson as json

def main():

    ongoing_count = {"key":""}

    # loop through each line for stdin
    for line in sys.stdin:

        # split line to separate key and value
        key_val = line.split("\t", 1)
        key = key_val[0]

        # parse the incoming json
        data = json.loads(key_val[1])

        # check if incoming key equals ongoing key
        if key == ongoing_count["key"]:
        # inrement ongoing metrics
        ongoing_count["first-metric"] += data["first-metric"]
        ongoing_count["second-metric"] += data["second-metric"]

        # if a new key, emit ongoing counts

        # set ongoing count with current data
        ongoing_count = data

    except Exception as e:
        sys.stderr.write("unable to parse reducer input: %s" % e)

    # emit the final counts

def writeOutput(ongoing_count):
    if ongoing_count["key"] != str():

        # generate json output
        output_json = json.dumps(ongoing_count)

    except Exception as e:
        sys.stderr.write("unable to create reducer json: %s" % e)

    # write the key and json to stdout
    print "%s\t%s" % (key, output_json)

if __name__ == "__main__":

Go reducer

package main

import (

var tab = []byte("\t")

func main() {
    var rawInput []string
    var input mapperOutput
    var ongoingCount mapperOutput
    var err error

    //  loop through each line for stdin
    ls := bufio.NewScanner(os.Stdin)
    for ls.Scan() {

    // split line to separate key and value
    rawInput = bytes.SplitN(ls.Bytes(), tab, 2)

    // parse the incoming json
    if err = json.Unmarshal(rawInput[1], &input); err != nil {
        log.Print("unable to parse reducer input: ", err)

    // check if incoming key equals ongoing key
    if ongoingCount.Key == input.Key {
        // inrement ongoing metrics
        ongoingCount.FirstMetric += input.FirstMetric
        ongoingCount.SecondMetric += input.SecondMetric

    } else {
        // if a new key, emit ongoing counts
        // set ongoing count with current data
        ongoingCount = input



    if ls.Err() != nil {
    log.Print("error reading from stdin: ", ls.Err())

    // emit the final counts


func writeOutput(o mapperOutput) {
    if len(o.Key) == 0 {
    // generate json output
    data, err := json.Marshal(o)
    if err != nil {
    log.Print("unable to marshal reducer output: ", err)
    // write the key and json to stdout
    fmt.Fprintf(os.Stdout, "%s\t%s\n", o.Key, data)

type mapperOutput struct {
    Key          string `json:"key"`
    SecondaryKey string `json:"secondary-key"`
    FirstMetric  int64  `json:"first-metric"`
    SecondMetric int64  `json:"second-metric"`

Go 的實作在程式碼數量上有略微增加,不過,對照 Go 提供的簡易部署和效能提升,還是值得的。由於程式被編譯成單一的二進位檔,我們可以包進我們所需要的所有第三方程式庫,只要將我們的二進位檔放上 S3 就完成了部署工作,也不需要 bootstrap 動作。

我們在速度上有了很棒的提升。我在同一份資料上跑 Go 實作和舊的 Python mapper/reducer,歷經幾次執行之後,我發現平均約提升了 25% 的速度。兩者的這些 mapper/reducers 在 CSV、JSON、regex 上,都使用標準程式庫 (除了 Python 的 simplejson 是個例外)。

當我們持續建構與改進我們的平台技術,我們對 Go 越來越有信心且熟悉。從後台程序到簡單的 MapReduce 腳本,Go 都是我們團隊在伺服器端的優先選擇。它讓我們打造高性能且可靠又易於維護的服務,Go 社群的熱情,加上高品質釋出版本的速度,讓我們樂於渴望看到這套語言的未來。

