hide's memo
28 4月, 2022

PostgreSQLをADFのシンクとして使用する方法(Azure Data Factory)

[English]

ADFではPostgreSQLをシンクとして使用するためには、セルフホステッド統合ランタイムを使用する必要ある。
そのため、まずはセルフホステッド統合ランタイム環境を構築する。
(セフルホステッドランタイムとは(Azure Data Factory) を参照)

また、PpostgreSQLをシンクのリンクサービスとしては使用できないので、
ODBCをリンクサービスとして使用する必要があり、構成としては下図のようになる。

 

■Step1

PostgreSQLのODBCドライバをインストールするする。

(1)PostgreSQL ODBC ドライバをダウンロードする
(1.1) https://www.postgresql.org/ にアクセス
(1.2)”Download” -> “File Browser” -> “odbc” -> “versions” -> “msi” の順でリンクをたどる。
(1.3)psqlodbc_13_02_0000-x64.zip をダウンロード (最初、Version 9.2. で試したがADFトン連携でうまく動かなかった)

(2)ODBC ドライバをインストールする

■Step2

(1)コピーアクテビティのシンクのデータセットに”ODBC” を選択
(2)ODBCのリンクサービスを以下のように設定

 

15 4月, 2022

セフルホステッドランタイムとは(Azure Data Factory)

[English]

セルフホステッドランタイムは、下図のように
自分のWindoowsPCや、自分が管理しているWindowsサーバ上で、ADFのパイプラインの一部を動かすことができる。
そのため、イントラ内の情報にアクセスしたい場合には、セフルホステッドランタイムをイントラ内で動かすことでアクセス可能になる。

 

25 3月, 2022

WebAccessアクテビティの出力 (Azure Data Factory)

あるWebAPIが jsonを返す場合に、レスポンスボディの内容によって、

Azure Data Factoryの WebAccessアクティビティに格納される情報が以下のように変わる。

1. 完全に空(0バイト)の場合のWebAccessの出力配下のようになる

{
   "Response" :"",
   "ADFWebActivityResponseHeader"  : {
       "Date" : ....
          :
   },
   "effectiveIntegrationRuntime" : "xxxxx",
   "executionDuration" : 0,
    :
}

上記のような結果になるので、@activity(‘Web’).output.Responseで内容を取得可能

 

2. かっこしかない場合

ResopnseBodyの内容

{}

WebAccessの出力結果は、以下のように”Response”がないので@activity(‘Web’).output.Responseでアクセスできない。(落ちる)

{
   "ADFWebActivityResponseHeader"  : {
      "Date" : ....
        :
    },
    "effectiveIntegrationRuntime" : "xxxxx",
    "executionDuration" : 0,
        :
}

 

3.配列のフィールドが1つだけ

{ "data" : [ ] }

WebAccessの出力結果は、以下のようになる。@activity(‘Web’).output.dataで配列にアクセスできる。

{
   "data" :[],
   "ADFWebActivityResponseHeader"  : {
      "Date" : ....
         :
   },
   "effectiveIntegrationRuntime" : "xxxxx",
   "executionDuration" : 0,
      :
}

4. 配列の中身がある

{
  "data" : [
     {
        "name" :" aaa",
        "score" : 30
     }
   ]
}

WebAccessの出力結果は、以下のようになる。@activity(‘Web’).output.dataで配列にアクセスできる。

{
    "data" : [
        {
            "name" :" aaa",
            "score" : 30
         }
   ],
   "ADFWebActivityResponseHeader"  : {
      "Date" : ....
        :
   },
   "effectiveIntegrationRuntime" : "xxxxx",
   "executionDuration" : 0,
      :
}
4 3月, 2022

CopyアクティビティのソースにRESTを設定し、動的なURLを指定する(Azure Data Factory)

[English]

Copy アクティビティのSRC、REST-APIのURLに変数を使う方法。

1. DataSet を作成。
1.1. リンクサービスはREST でベースURLはなんでもよい(例 http://localhost/)
1.2. パラメータを新規追加、例として変数名を URLとする。

 

1.3. 接続の相対URLを、@dataset().URLとする

 

2. パイプライン作成
2.Copyアクティビティ追加
2.1. SRCを選択
(1)SRCに、上記1 の DataSetを選択
(2)データセットのプロパティの部分に、URLが出てくるてので、指定したい値をセットする。

18 2月, 2022

csvをjsonにCopy アクティビティを使って変換 (Azure Data Factory)

[English]

以下のようなフォーマット変換をCopy アクティビティを使って実現する。

 

1.Source設定

追加の列に以下を設定(マッピング時に固定値や同じカラムを複数回出現させられないので、ソース側で値を用意する)

 

2.マッピング

 

 

■関連記事

csvファイルをjson に変換してパイプライン変数に格納 (Azure Data Factory)

17 2月, 2022

WebAccessの結果レスポンスを、そのまま次のWebAccessのPOSTに利用する(Azure Data Factory)

[english]

Adfで、1つ前のWebAccessアクティビティの結果を、次のWebAccessアクティビティのPOSTのBODYに使用する。

【注意】Web Accessアクテビティが扱えるサイズは4MBという制限があります。MSのサイト

Web2 アクティビティの設定

メソッド:POST

本文: @concat(‘{ “records”:’, activity(‘Web1’).output.records, ‘}’)

 

Web1のアクティビティの結果は、そのまま単純に文字列ではない。値を取り出すのに、activity(‘Web1’).output.records を指定するが、

これは配列オブジェクトになっており、 最初の

{
     "records" : [

の部分と、最後の

   ]
}

の部分がなくなっている。そのため、conca()関数を使って、再度組み立てている。

 

13 2月, 2022

csvファイルをjson に変換してパイプライン変数に格納 (Azure Data Factory)

[English]

BLOBストレージ上のcsvファイルをパイプライン変数に格納する例。

csvファイルの内容をjsonにしてWeb Accessアクティビティに使用したい場合。

本来は、Copyアクティビティを使って変換するが、Copyアクティビティではできないような細かな処理を行いたい場合に、無理やり文字列操作で実現する。

この処理の場合、改行コードなどJSONでエスケープしなければならない文字列を全て自前でエスケープする必要がある。

1.BLOBストレージ上のCSVファイル


"name01","value01"
"name02","value02"
 

2.変換後のjson

{
     "records" : [
          { "id" : "name01", "info" : "value01"},
          { "id" : "name02", "info" : "value02"}
     ]
}

 

3. パイプライン

3.1. パイプライン変数

BUFFER_V1_1   String
BUFFER_V1_2   String
BUFFER_V1_3   String

3.2. パイプライン構成

3.3. Lookup1 (参照)

  • SRC …. json ファイルを指定

  • 「先頭行のみ」のチェックをはずす

 

3.4. ForEach1 (繰り返し)

  • 順次にチェック

  • 項目:
    @activity('Lookup1').output.value

 

3.5. Set V1 (変数の設定)

  • 変数:BUFFER_V1_1

  • 項目:
    @concat(variable('BUFFER_V1_2'), '{"id":"', ESCAPE(item().Prop_0), '","info":"', ESCAPE(item().Prop_1),'"}')


ここで、上記 ESCAPE()の部分は実際には以下の記述になる。

uriComponentToString(replace(replace(replace(replace(replace(urlComponent(coalesce( *** , ”)),’%5C’,’%5C%5C’), ‘%22′,’%5C%22’), ‘%0D’,’%5Cr’), ‘%0A’, ‘%5Cn’), ‘%09’, ‘%5Ct’))

replace(**, ‘%5C’,’%5C%5C) ….. “\”を”\\”に置換

replace(**, ‘%22’,’%5C%22) ….. “”” を “\””に置換

replace(‘**’, ‘%0D’,’%5Cr’) …. \rを “\r”に置換

replace(‘**’, ‘%0A’,’%5Cn’) …. \nを “\n”に置換

replace(‘**’, ‘%09′,’%5Ct’) ….. \t を “\t”日間

3.6. Set V2 (変数の設定)

  • 変数:BUFFER_V1_2
  • 項目:
    @concat(variable('BUFFER_V1_1'), ',')

 

3.7. Set V3 (変数の設定)

  • 変数:BUFFER_V1_3
  • 項目:
    @concat(variable('BUFFER_V1_3'), '{ "records" [', variable('BUFFER_V1_1'),'}')

 

 

以上で、上記パイプラインが終わったあとに、変数 BUFFER_V1_31には、返還後のjson文字列が入っている。

 

 

■関連記事

csvをjsonにCopy アクティビティを使って変換 (Azure Data Factory)

13 2月, 2022

jsonファイルをパイプライン変数に格納

[English]

BLOBストレージ上のjsonファイルをパイプライン変数に格納する例。

jsonファイルの内容を少し変えて(キー名等を変えて)Web Accessアクティビティに使用したい場合。

本来は、Copyアクティビティを使って変換するが、Copyアクティビティではできないような細かな処理を行いたい場合に、無理やり文字列操作で実現する。

この処理の場合、改行コードなどJSONでエスケープしなければならない文字列を全て自前でエスケープする必要がある。

1.BLOBストレージ上のjsonファイル

{
     "datarecords" : [
          {  "name" : "name01",  "value" : " value01" },
          {  "name" : "name02",  "value" :  "value02" }
    ]
}

2.変換後のjson

{
     "records" : [
          { 
              "attr" : { "rid" : "name01"},
                "id" : "name01",
"info" : "value01"
          }, {
"attr" : { "rid" : "name01"},
               "id" : "name02",
"info" : "value02"
          } ] }

 

3. パイプライン

3.1. パイプライン変数

BUFFER_V1_1   String
BUFFER_V1_2   String
BUFFER_V1_3   String

3.2. パイプライン構成

3.3. Lookup1 (参照)

  • SRC …. json ファイルを指定

  • 「先頭行のみ」のチェックをはずす

 

3.4. ForEach1 (繰り返し)

  • 順次にチェック

  • 項目:
    @activity('Lookup1').output.value[0].datarecords

 

3.5. Set V1 (変数の設定)

  • 変数:BUFFER_V1_1

  • 項目:
    @concat(variable('BUFFER_V1_2'), '{ "attr":{ "rid": "', ESCAPE(item().name), '"}, "id":"', ESCAPE(item().name), '","info":"', ESCAPE(item().value),'"}')

ここで、上記 ESCAPE()の部分は実際には以下の記述になる。

uriComponentToString(replace(replace(replace(replace(replace(urlComponent(coalesce( *** , ”)),’%5C’,’%5C%5C’), ‘%22′,’%5C%22’), ‘%0D’,’%5Cr’), ‘%0A’, ‘%5Cn’), ‘%09’, ‘%5Ct’))

replace(**, ‘%5C’,’%5C%5C) ….. “\”を”\\”に置換

replace(**, ‘%22’,’%5C%22) ….. “”” を “\””に置換

replace(‘**’, ‘%0D’,’%5Cr’) …. \rを “\r”に置換

replace(‘**’, ‘%0A’,’%5Cn’) …. \nを “\n”に置換

replace(‘**’, ‘%09′,’%5Ct’) ….. \t を “\t”日間

 

3.6. Set V2 (変数の設定)

  • 変数:BUFFER_V1_2
  • 項目:
    @concat(variable('BUFFER_V1_1'), ',')

 

3.7. Set V3 (変数の設定)

  • 変数:BUFFER_V1_3
  • 項目:
    @concat(variable('BUFFER_V1_3'), '{ "records" [', variable('BUFFER_V1_1'),'}')

 

 

以上で、上記パイプラインが終わったあとに、変数 BUFFER_V1_31には、返還後のjson文字列が入っている。