OpenAI function calling with Elasticsearch

Introduction

Function calling in OpenAI refers to the capability of AI models to interact with external functions or APIs, allowing them to perform tasks beyond text generation. This feature enables the model to execute code, retrieve information from databases, interact with external services, and more, by calling predefined functions.

The model intelligently recognised which function needs to be called based on the user prompt and calls the function with appropriate arguments. Arguments can also be generated dynamically by model.

Possible use cases include:

  • Data Retrieval: Accessing real-time data from databases or APIs. (e.g., weather information, stock prices)
  • Enhanced Interactions: Performing complex operations that require logic and computation (e.g., booking a flight, scheduling a meeting).
  • Integration with External Systems: Interacting with external systems and tools (e.g., executing a script, sending an email).

In this blog we’re going to create two functions:

  1. fetch_from_elasticsearch() - Fetch data from Elasticsearch using natural language query.
  2. weather_report() - Fetch a weather report for a particular location.

We'll integrate function calling to dynamically determine which function to call based on the user's query and generate the necessary arguments accordingly.

sequence-diagram-function-calling-elastic

Prerequisites

Elastic

Create an Elastic Cloud deployment to get all Elastic credentials.

  • ES_API_KEY: Create an API key.
  • ES_ENDPOINT: Copy endpoint of Elasticsearch.

Open AI

  • OPENAI_API_KEY: Setup an Open AI account and create a secret key.
  • GPT_MODEL: We’re going to use the gpt-4o model but you can check here which model is being supported for function calling.

Open-Meteo API

We will use the Open-Meteo API. Open-Meteo is an open-source weather API and offers free access for non-commercial use. No API key required.

  • OPEN_METEO_ENDPOINT: https://api.open-meteo.com/v1/forecast

Sample Data

After creating Elastic cloud deployment, let’s add sample flight data on Kibana. Sample data will be stored into the kibana_sample_data_flights index.

Python notebook

We are going to create a quick Python notebook for the entire flow. Install below dependencies and create a Python script/notebook.

pip install openai

Import packages

from openai import OpenAI
from getpass import getpass
import json
import requests

Accept Credentials

OPENAI_API_KEY = getpass("OpenAI API Key:")
client = OpenAI(
    api_key=OPENAI_API_KEY,
)
GPT_MODEL = "gpt-4o"

ES_API_KEY = getpass("Elastic API Key:")
ES_ENDPOINT = input("Elasticsearch Endpoint:")
ES_INDEX = "kibana_sample_data_flights"

OPEN_METEO_ENDPOINT = "https://api.open-meteo.com/v1/forecast"

Function1: fetch_from_elasticsearch()

def fetch_from_elasticsearch(nl_query):

This function will accept the nl_query parameter as a string in natural language (English) and return a json elasticsearch response as a string. It will execute all queries on the kibana_sample_data_flights index which is holding all flights related data.

It will consist of 3 steps / sub functions.

  1. get_index_mapping() - It will return mapping for an Index.
  2. get_ref_document() - It will return a sample document for reference.
  3. build_query() - Here we going to leverage GPT model (gpt-4o) with few shots prompt to convert user question (text) into Elasticsearch Query DSL

Continuing notebook by adding all functions together.

Get Index Mapping

def get_index_mapping():

    url = f"""{ES_ENDPOINT}/{ES_INDEX}/_mappings"""

    headers = {
        "Content-type": "application/json",
        "Authorization": f"""ApiKey {ES_API_KEY}""",
    }

    resp = requests.request("GET", url, headers=headers)
    resp = json.loads(resp.text)
    mapping = json.dumps(resp, indent=4)

    return mapping

Get reference document

def get_ref_document():

    url = f"""{ES_ENDPOINT}/{ES_INDEX}/_search?size=1"""

    headers = {
        "Content-type": "application/json",
        "Authorization": f"""ApiKey {ES_API_KEY}""",
    }

    resp = requests.request("GET", url, headers=headers)
    resp = json.loads(resp.text)
    json_resp = json.dumps(resp["hits"]["hits"][0], indent=4)

    return json_resp

Note: You can also cache the index mapping and reference document to avoid frequent queries to Elasticsearch.

Generate Elasticsearch Query DSL based on user query

def build_query(nl_query):

    index_mapping = get_index_mapping()
    ref_document = get_ref_document()

    few_shots_prompt = """
    1. User Query - Average delay time of flights going to India
        Elasticsearch Query DSL:
         {
          "size": 0,
          "query": {
            "bool": {
              "filter": {
                "term": {
                  "DestCountry": "IN"
                }
              }
            }
          },
          "aggs": {
            "average_delay": {
              "avg": {
                "field": "FlightDelayMin"
              }
            }
          }
        }

        2. User Query - airlines with the highest delays
        Elasticsearch Query DSL:
         {
          "size": 0,
          "aggs": {
            "airlines_with_highest_delays": {
              "terms": {
                "field": "Carrier",
                "order": {
                  "average_delay": "desc"
                }
              },
              "aggs": {
                "average_delay": {
                  "avg": {
                    "field": "FlightDelayMin"
                  }
                }
              }
            }
          }
        }

        3. User Query - Which was the last flight that got delayed for Bangalore
        Elasticsearch Query DSL:
        {
          "query": {
            "bool": {
              "must": [
                { "match": { "DestCityName": "Bangalore" } },
                { "term": { "FlightDelay": true } }
              ]
            }
          },
          "sort": [
            { "timestamp": { "order": "desc" } }
          ],
          "size": 1
        }
    """

    prompt = f"""
        Use below index mapping and reference document to build Elasticsearch query:

        Index mapping:
        {index_mapping}

        Reference elasticsearch document:
        {ref_document}

        Return single line Elasticsearch Query DSL according to index mapping for the below search query related to flights.:

        {nl_query}

        If any field has a `keyword` type, Just use field name instead of field.keyword.

        Just return Query DSL without REST specification (e.g. GET, POST etc.) and json markdown format (e.g. ```json)

        few example of Query DSL

        {few_shots_prompt}

    """

    resp = client.chat.completions.create(
        model=GPT_MODEL,
        messages=[
            {
                "role": "user",
                "content": prompt,
            }
        ],
        temperature=0,
    )

    return resp.choices[0].message.content

Note: Sometimes, it might be necessary to modify the prompt to get a more accurate response (query DSL) or a consistent report. While we rely on the model's own knowledge to generate queries, the reliability can be increased with few-shot prompting for more complex queries. Few-shot prompting involves providing examples of the types of queries you want it to return, which helps in increasing consistency.

Execute Query on Elasticsearch

def fetch_from_elasticsearch(nl_query):

    query_dsl = build_query(nl_query)
    print(f"""Query DSL: ==== \n\n {query_dsl}""")

    url = f"""{ES_ENDPOINT}/{ES_INDEX}/_search"""

    payload = query_dsl

    headers = {
        "Content-type": "application/json",
        "Authorization": f"""ApiKey {ES_API_KEY}""",
    }

    resp = requests.request("GET", url, headers=headers, data=payload)
    resp = json.loads(resp.text)
    json_resp = json.dumps(resp, indent=4)

    print(f"""\n\nElasticsearch response: ==== \n\n {json_resp}""")
    return json_resp

Text to Elasticsearch Query

Let’s call fetch_from_elasticsearch() with some questions / Query.

Query1

fetch_from_elasticsearch("Average delay time of flights going to India")

Response

Query DSL: ==== 

 {
  "size": 0,
  "query": {
    "bool": {
      "filter": {
        "term": {
          "DestCountry": "IN"
        }
      }
    }
  },
  "aggs": {
    "average_delay": {
      "avg": {
        "field": "FlightDelayMin"
      }
    }
  }
}


Elasticsearch response: ==== 

 {
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 372,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    },
    "aggregations": {
        "average_delay": {
            "value": 48.346774193548384
        }
    }
}

Query2

fetch_from_elasticsearch("airlines with the highest delays")

Response

Query DSL: ==== 

 {
  "size": 0,
  "aggs": {
    "airlines_with_highest_delays": {
      "terms": {
        "field": "Carrier",
        "order": {
          "average_delay": "desc"
        }
      },
      "aggs": {
        "average_delay": {
          "avg": {
            "field": "FlightDelayMin"
          }
        }
      }
    }
  }
}


Elasticsearch response: ==== 

 {
    "took": 3,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 10000,
            "relation": "gte"
        },
        "max_score": null,
        "hits": []
    },
    "aggregations": {
        "airlines_with_highest_delays": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
            "buckets": [
                {
                    "key": "Logstash Airways",
                    "doc_count": 3323,
                    "average_delay": {
                        "value": 49.59524526030695
                    }
                },
                {
                    "key": "ES-Air",
                    "doc_count": 3211,
                    "average_delay": {
                        "value": 47.45250700716288
                    }
                },
                {
                    "key": "Kibana Airlines",
                    "doc_count": 3219,
                    "average_delay": {
                        "value": 46.38397017707363
                    }
                },
                {
                    "key": "JetBeats",
                    "doc_count": 3261,
                    "average_delay": {
                        "value": 45.910763569457224
                    }
                }
            ]
        }
    }
}

Try some of these queries and see what result you get -

fetch_from_elasticsearch("top 10 reasons for flight cancellation")

fetch_from_elasticsearch("top 5 flights with expensive ticket")

fetch_from_elasticsearch("flights got delay for Bangalore")

Once you are done with testing you can comment out the print statement from the above code which we added for the debugging purpose.

Function2: weather_report()

def weather_report(latitude, longitude):

This function will accept parameter latitude and longitude as a string. It will call the Open-Meteo API to get reports for specified coordinates.

Add function in the notebook

def weather_report(latitude, longitude):

    url = f"""{OPEN_METEO_ENDPOINT}?latitude={latitude}&longitude={longitude}&current=temperature_2m,precipitation,cloud_cover,visibility,wind_speed_10m"""

    resp = requests.request("GET", url)
    resp = json.loads(resp.text)
    json_resp = json.dumps(resp, indent=4)

    print(f"""\n\nOpen-Meteo response: ==== \n\n {json_resp}""")
    return json_resp

Test function

Let’s call weather_report() function:

Check for Whitefield, Bangalore

weather_report("12.96","77.75")

Response

{
    "latitude": 19.125,
    "longitude": 72.875,
    "generationtime_ms": 0.06604194641113281,
    "utc_offset_seconds": 0,
    "timezone": "GMT",
    "timezone_abbreviation": "GMT",
    "elevation": 6.0,
    "current_units": {
        "time": "iso8601",
        "interval": "seconds",
        "temperature_2m": "\u00b0C",
        "precipitation": "mm",
        "cloud_cover": "%",
        "visibility": "m",
        "wind_speed_10m": "km/h"
    },
    "current": {
        "time": "2024-05-30T21:00",
        "interval": 900,
        "temperature_2m": 29.7,
        "precipitation": 0.0,
        "cloud_cover": 36,
        "visibility": 24140.0,
        "wind_speed_10m": 2.9
    }
}

Function calling

In this part we will see how the OpenAI model detects which function needs to be called based on user query and generates the required arguments.

Define functions

Let’s define both functions in an array of objects. We’re going to create a new function run_conversation().

def run_conversation(query):

    all_functions = [
        {
            "type": "function",
            "function": {
                "name": "fetch_from_elasticsearch",
                "description": "All flights/airline related data is stored into Elasticsearch. Call this function if receiving any query around airlines/flights.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "Exact query string which is asked by user.",
                        }
                    },
                    "required": ["query"],
                },
            },
        },
        {
            "type": "function",
            "function": {
                "name": "weather_report",
                "description": "It will return weather report in json format for given location co-ordinates.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "latitude": {
                            "type": "string",
                            "description": "The latitude of a location with 0.01 degree",
                        },
                        "longitude": {
                            "type": "string",
                            "description": "The longitude of a location with 0.01 degree",
                        },
                    },
                    "required": ["latitude", "longitude"],
                },
            },
        },
    ]

In each object we need to set the properties.

type: function
name: The name of the function to be called
description: A description of what the function does, used by the model to choose when and how to call the function.
parameters: The parameters the function accepts, described as a JSON Schema object.

Check the tools reference to find out more about properties.

Call OpenAI Chat Completion API

Let’s set the above all_functions in the Chat Completion API. Add below snippet in run_conversation()

    messages = []
    messages.append(
        {
            "role": "system",
            "content": "If no data received from any function. Just say there is issue fetching details from function(function_name).",
        }
    )

    messages.append(
        {
            "role": "user",
            "content": query,
        }
    )

    response = client.chat.completions.create(
        model=GPT_MODEL,
        messages=messages,
        tools=all_functions,
        tool_choice="auto",
    )

    response_message = response.choices[0].message
    tool_calls = response_message.tool_calls

    print(tool_calls)

tools: Set of all functions. tool_choice = "auto": This lets the model decide whether to call functions and, if so, which functions to call. But we can force the model to use one or multiple functions by setting the appropriate value to tool_choice.

  • Set tool_choice: "required" to ensure the model always calls one or more functions.
  • Use tool_choice: {"type": "function", "function": {"name": "my_function"}} to force the model to call a specific function.
  • Set tool_choice: "none" to disable function calling and make the model generate only user-facing messages.

Let’s run the Chat Completion API and see if it is selecting the proper function or not.

run_conversation(“how many flights got delay”)

Response

[ChatCompletionMessageToolCall(id='call_0WcSIBFj3Ekg2tijS5yJJOYu', function=Function(arguments='{"query":"flights delayed"}', name='fetch_from_elasticsearch'), type='function')]

If you have noticed, it has detected name='fetch_from_elasticsearch', because we’ve asked flights related query, and Elasticsearch having flight related data. Let’s try some other query.

run_conversation("hows weather in delhi")

Response

[ChatCompletionMessageToolCall(id='call_MKROQ3VnmxK7XOgiEJ6fFXaW', function=Function(arguments='{"latitude":"28.7041","longitude":"77.1025"}', name='weather_report'), type='function')]

Function detected name='weather_report()' and argument generated by model arguments='{"latitude":"28.7041","longitude":"77.1025"}'. We have just passed the city name(Delhi) and the model generated proper arguments i.e. latitude and longitude.

Execute selected function

Let’s execute the detected function with generated arguments. In this part we are simply going to run the function which has been determined by model and going to pass the generated argument.

Add below snippet in run_conversation().

    if tool_calls:

        available_functions = {
            "fetch_from_elasticsearch": fetch_from_elasticsearch,
            "weather_report": weather_report,
        }
        messages.append(response_message)

        for tool_call in tool_calls:

            function_name = tool_call.function.name
            function_to_call = available_functions[function_name]
            function_args = json.loads(tool_call.function.arguments)

            if function_name == "fetch_from_elasticsearch":
                function_response = function_to_call(
                    nl_query=function_args.get("query"),
                )

            if function_name == "weather_report":
                function_response = function_to_call(
                    latitude=function_args.get("latitude"),
                    longitude=function_args.get("longitude"),
                )

            print(function_response)

Let’s test this part:

run_conversation("hows weather in whitefield, bangalore")

Response

[ChatCompletionMessageToolCall(id='call_BCfdhkRtwmkjqmf2A1jP5k6U', function=Function(arguments='{"latitude":"12.97","longitude":"77.75"}', name='weather_report'), type='function')]

 {
    "latitude": 13.0,
    "longitude": 77.75,
    "generationtime_ms": 0.06604194641113281,
    "utc_offset_seconds": 0,
    "timezone": "GMT",
    "timezone_abbreviation": "GMT",
    "elevation": 873.0,
    "current_units": {
        "time": "iso8601",
        "interval": "seconds",
        "temperature_2m": "\u00b0C",
        "precipitation": "mm",
        "cloud_cover": "%",
        "visibility": "m",
        "wind_speed_10m": "km/h"
    },
    "current": {
        "time": "2024-05-30T21:00",
        "interval": 900,
        "temperature_2m": 24.0,
        "precipitation": 0.0,
        "cloud_cover": 42,
        "visibility": 24140.0,
        "wind_speed_10m": 11.7
    }
}

It detected the function weather_report() and executed it with proper arguments.

Let’s try with some flight related query where we’re expecting to get data from Elasticsearch.

run_conversation("Average delay for Bangalore flights")

Response

{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 78,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    },
    "aggregations": {
        "average_delay": {
            "value": 48.65384615384615
        }
    }
}

Extend the conversation

We’re getting all the responses in JSON format. Which is not really human readable. Let’s use the GPT model to convert this response into natural language. We’ll pass the function response to the Chat Completion API for extending the conversation.

Add below snippet in run_conversation().

            messages.append(
                {
                    "tool_call_id": tool_call.id,
                    "role": "tool",
                    "name": function_name,
                    "content": function_response,
                }
            )

        second_response = client.chat.completions.create(
            model=GPT_MODEL,
            messages=messages,
        )

        return second_response.choices[0].message.content

Let’s test end to end flow. I would recommend commenting out all the print statements unless you want to keep them for debugging purposes.

i = input("Ask:")
answer = run_conversation(i)
print(answer)

Q1: Average delay for Bangalore flights

The average delay for Bangalore flights is approximately 48.65 minutes.

Q2: last 10 flight delay to Bangalore, show in table Text to Elasticsearch

The above data is coming from Elasticsearch and model converted json response into table.

Q3: How is the climate in Whitefield, Bangalore, and what precautions should I take?
Text to Elasticsearch

Model called weather_report() function to get information for Whitefield, Bangalore and it added what precautions need to be taken.

Some of the Q/A performed:

Q4: How's the weather in BKC Mumbai?

The current weather in BKC Mumbai is as follows:

  • Temperature: 31.09°C
  • Humidity: 74.5%
  • Wind Speed: 0.61 m/s, coming from the west-northwest (256.5°)
  • No rain intensity or accumulation reported at the moment.

Q5: Which day of the week do flights experience the most delays?

Flights experience the most delays on Thursdays, based on the aggregation data that counts the total number of delays by day of the week.

Q6: Provide a table showing the count of flight cancellations by country

Here is a table showing the count of flight cancellations by country:

CountryCount of Cancellations
IT (Italy)315
US (United States)253
JP (Japan)118
CN (China)97
CA (Canada)67
DE (Germany)66
IN (India)63
GB (United Kingdom)72
AU (Australia)56
KR (South Korea)55

Parallel Function Calling

Newer models such as gpt-4o or gpt-3.5-turbo can call multiple functions in one turn. For example if we ask "details of last 10 delayed flights for Bangalore in tabular format and describe the current climate there.", Here we need information from both functions.

Functon calling

Python Notebook

Find the complete Python notebook on Elasticsearch labs.

Conclusion

Incorporating function calling into your applications using models like GPT-4 or others can significantly enhance their capability and flexibility. By strategically configuring the tool_choice parameter, you can dictate when and how the model interacts with external functions.

It also adds one layer of intelligence over your response. In the above example I asked to show data in tabular format and it automatically converts json into table format. It also added a country name based on the country code.

So function calling not only streamlines complex workflows but also opens up new possibilities for integrating various data sources and APIs, making your applications smarter and more responsive to user needs.

Ready to try this out on your own? Start a free trial.
Looking to build RAG into your apps? Want to try different LLMs with a vector database?
Check out our sample notebooks for LangChain, Cohere and more on Github, and join Elasticsearch Relevance Engine training now.
Recommended Articles